Replace StreamBatcher with StreamEventAggregator instead in the StreamSocketSession#57
Replace StreamBatcher with StreamEventAggregator instead in the StreamSocketSession#57aleksandar-apostolov wants to merge 14 commits intodevelopfrom
StreamBatcher with StreamEventAggregator instead in the StreamSocketSession#57Conversation
Replace the batching approach in StreamSocketSession with an adaptive event aggregator that switches between individual and aggregated delivery based on traffic volume. Low traffic: events pass through individually (no overhead). Spike: events grouped by type into StreamAggregatedEvent, enabling product SDKs to apply one state update instead of thousands. Architecture: two decoupled coroutines (collector + dispatcher) with configurable threshold, maxWindow ceiling, and bounded dispatch queue. BREAKING CHANGE: StreamSocketConfig batch params replaced with aggregation params. StreamComponentProvider.batcher replaced with eventAggregator.
WalkthroughReplaced the library's message batching component with an event aggregation system. Added Changes
Sequence DiagramsequenceDiagram
actor User
participant SocketSession as Socket<br/>Session
participant Aggregator as Event<br/>Aggregator
participant Buffer as Buffered<br/>State
participant Handler as Event<br/>Handler
participant Listener as Event<br/>Listener
User->>SocketSession: WebSocket message arrives
SocketSession->>Aggregator: offer(raw)
Aggregator->>Buffer: Enqueue raw event
alt Low Traffic (Below Threshold)
Buffer->>Aggregator: Deserialize single event
Aggregator->>Handler: Individual event T
Handler->>Listener: onEvent(T)
else Spike (≥ Threshold)
Buffer->>Aggregator: Deserialize & group by type
Aggregator->>Handler: StreamAggregatedEvent<Map<Type, List<T>>>
Handler->>Listener: onEvent(StreamAggregatedEvent)
else Timeout (Max Window)
Buffer->>Aggregator: maxWindowMs elapsed
Aggregator->>Handler: Dispatch buffered events
Handler->>Listener: onEvent(...)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~55 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
PR checklist ✅All required conditions are satisfied:
🎉 Great job! This PR is ready for review. |
StreamBatcher with StreamEventAggregator inestead
StreamBatcher with StreamEventAggregator inesteadStreamBatcher with StreamEventAggregator instead
…alizers Wrap typeExtractor and deserializer calls in try-catch so a throwing function doesn't kill the collector coroutine. Previously only Result.failure was handled — a thrown exception would stop all event processing. Also adds 6 edge case tests: - typeExtractor throws → collector survives - deserializer throws (not Result.failure) → collector survives - handler throws → subsequent events still delivered - exactly-at-threshold boundary → triggers aggregation - all events fail deserialization → batch dropped, no crash - dispatch queue full → events dropped with warning, no crash
There was a problem hiding this comment.
Actionable comments posted: 13
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt (1)
196-196:⚠️ Potential issue | 🟡 MinorUpdate stale batching terminology.
StreamSocketConfigno longer exposes batching tunables here; this KDoc should refer to aggregation/event processing instead.As per coding guidelines, “Keep processor/queue behaviour documented via KDoc or dedicated docs when semantics evolve.”
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt` at line 196, Update the KDoc for StreamClient.kt's constructor parameter socketConfig to replace "batching" terminology with the new semantics: mention aggregation/event processing (or processor/queue behaviour) instead of batching tunables; specifically update the `@param` socketConfig text that currently references "batching" to describe aggregation/event processing configuration (URL, auth, timing, event aggregation/processing) and add a brief note to refer to processor/queue docs if needed, ensuring the symbol StreamSocketConfig is referenced by name in the KDoc.stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt (1)
66-83:⚠️ Potential issue | 🟠 MajorTighten or hide the custom aggregator contract.
StreamEventAggregator<*>accepts any implementation, butStreamSocketSessiononly handles composite socket events and aggregated composite socket events. A custom aggregator that emits product events directly will compile but be ignored or fail at runtime. Prefer a typed factory/seam that receives the SDK parser, or document and enforce the exact emitted payload contract.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt` around lines 66 - 83, The StreamComponentProvider currently accepts a generic StreamEventAggregator<*> which allows aggregators that emit unsupported event types; restrict the contract to the exact composite socket event type used by the session (or hide the aggregator behind a typed factory). Update the StreamComponentProvider constructor parameter from StreamEventAggregator<*> to a concrete type matching the SDK socket payload (e.g., StreamEventAggregator<StreamSocketEventComposite> or an interface like StreamCompositeEventAggregator) and adjust any callers, or replace the parameter with a factory/provider that receives the SDK parser and returns a correctly-typed aggregator; ensure StreamSocketSession is updated to consume only that concrete aggregator type so custom aggregators that emit non-composite events are compile-time rejected.stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.kt (1)
123-145:⚠️ Potential issue | 🟡 MinorValidate aggregation config at the factory boundary.
These factories accept invalid aggregation values and defer failure until the socket pipeline builds the aggregator. Add shared validation here so misconfiguration fails close to the caller.
Suggested shared validation shape
public fun jwt( url: StreamWsUrl, apiKey: StreamApiKey, clientInfoHeader: StreamHttpClientInfoHeader, @@ aggregationThreshold: Int = DEFAULT_AGGREGATION_THRESHOLD, aggregationMaxWindowMs: Long = DEFAULT_AGGREGATION_MAX_WINDOW_MS, aggregationDispatchQueueCapacity: Int = DEFAULT_AGGREGATION_DISPATCH_QUEUE_CAPACITY, - ): StreamSocketConfig = - StreamSocketConfig( + ): StreamSocketConfig { + validateAggregationConfig( + aggregationThreshold, + aggregationMaxWindowMs, + aggregationDispatchQueueCapacity, + ) + return StreamSocketConfig( url = url, @@ aggregationDispatchQueueCapacity = aggregationDispatchQueueCapacity, ) + } + + private fun validateAggregationConfig( + aggregationThreshold: Int, + aggregationMaxWindowMs: Long, + aggregationDispatchQueueCapacity: Int, + ) { + require(aggregationThreshold > 0) { "aggregationThreshold must be > 0" } + require(aggregationMaxWindowMs > 0) { "aggregationMaxWindowMs must be > 0" } + require(aggregationDispatchQueueCapacity > 0) { + "aggregationDispatchQueueCapacity must be > 0" + } + }Apply the same helper in
anonymousandcustom.Also applies to: 162-184, 202-226
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.kt` around lines 123 - 145, Add validation at the factory boundary by introducing a shared helper (e.g., validateAggregationConfig(aggregationThreshold: Int, aggregationMaxWindowMs: Long, aggregationDispatchQueueCapacity: Int)) and call it from the StreamSocketConfig.jwt, StreamSocketConfig.anonymous and StreamSocketConfig.custom factory methods before constructing the StreamSocketConfig; the validator should check that aggregationThreshold > 0, aggregationMaxWindowMs >= 0, and aggregationDispatchQueueCapacity > 0 (or whatever project constraints apply) and throw an IllegalArgumentException with a clear message if any value is invalid so misconfiguration fails fast at the caller.
🧹 Nitpick comments (1)
stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamEventAggregatorImplTest.kt (1)
87-160: Use coroutines-test and virtual time instead of real dispatchers and blocking sleeps.These test methods use
Dispatchers.Default,Thread.sleep, andCountDownLatchwhich make tests slower and potentially flaky. Replace withrunTest,StandardTestDispatcher(testScheduler), andadvanceTimeBy/advanceUntilIdlefor deterministic, controllable timing.Per coding guidelines: "Unit tests live under each module's src/test/java; use MockK and coroutines-test to control timing."
Applies to lines 87–160, 455–500, 532–573.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamEventAggregatorImplTest.kt` around lines 87 - 160, The test uses real Dispatchers, Thread.sleep and CountDownLatch causing flakiness; convert the test function `spike triggers aggregated delivery grouped by type` to use runTest with a StandardTestDispatcher(testScheduler) (pass that dispatcher/scheduler as the CoroutineScope to StreamEventAggregator), replace Thread.sleep and CountDownLatch with test-scheduler advances (advanceTimeBy / advanceUntilIdle) and coroutine-friendly synchronization (e.g. CompletableDeferred or assertions after advanceUntilIdle), and remove direct use of Dispatchers.Default — modify creation of StreamEventAggregator<TestEvent> (symbols: StreamEventAggregator, typeExtractor, deserializer, aggregationThreshold, maxWindowMs), its start()/stop()/offer() calls and the onEvent handler to work under runTest and driven virtual time so the inbox processing is deterministically advanced by testScheduler rather than sleeping or latching.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamAggregatedEvent.kt`:
- Line 52: The StreamAggregatedEvent constructor currently stores the caller's
Map and List instances directly on the public val events, allowing callers to
mutate backing collections; fix it in StreamAggregatedEvent by performing a
defensive deep copy in the constructor: create a new Map from the supplied
events where each entry's List value is copied to an immutable/defensive List
(e.g., toList()/List.copyOf or Collections.unmodifiableList) and expose that
immutable map (e.g., via toMap()/Map.copyOf or an unmodifiable wrapper) as the
public events property so no external mutations can affect the internal state.
In
`@stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamEventAggregator.kt`:
- Around line 38-42: The KDoc for StreamEventAggregator currently states
aggregation occurs when events "exceed [aggregationThreshold]" but the code and
tests aggregate at or above the threshold; update the KDoc to reflect that
behavior by changing the wording to "at or above [aggregationThreshold]" (or
equivalent) so it matches the implementation; ensure you reference the
aggregationThreshold property and StreamAggregatedEvent in the KDoc comment
above the StreamEventAggregator class/function to remove the off-by-one
ambiguity for SDK consumers.
- Around line 109-126: Validate the input parameters in the
StreamEventAggregator factory function before constructing
StreamEventAggregatorImpl: ensure aggregationThreshold > 0, maxWindowMs > 0,
dispatchQueueCapacity > 0, and inboxCapacity is either Channel.UNLIMITED or >=
1; if any check fails, throw an IllegalArgumentException with a clear message
referencing the offending parameter. Keep these checks in the public
StreamEventAggregator(...) function (the factory that calls
StreamEventAggregatorImpl) so invalid values fail fast prior to channel
construction or aggregator creation.
In
`@stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamEventAggregatorImpl.kt`:
- Line 50: Change StreamEventAggregatorImpl to accept a non-null StreamLogger
via constructor injection instead of using the nullable internal var logger:
StreamLogger?; remove or stop relying on the mutable nullable property and
update all usages (queue-full warnings, handler/deserializer error calls) to use
the constructor-provided logger. Update the factory/component provider that
creates StreamEventAggregatorImpl to pass the tagged StreamLogger instance (use
the same pattern where other components obtain a tagged logger) and ensure other
similar classes/occurrences referenced around the file (the members at the
locations corresponding to lines ~104-107) are also converted to
constructor-injected non-null loggers. Ensure logs follow the tagging guideline
and do not include secrets/tokens.
- Around line 53-87: stop() currently closes inbox and dispatchQueue but resets
started to false, so subsequent offer() calls auto-start with closed channels
and dead jobs; fix by either making stop() terminal or ensuring full restart by
recreating channels and jobs in start(). Specifically, choose one approach: (A)
make stop() terminal by setting a separate AtomicBoolean like permanentlyStopped
(set to true in stop()) and update offer()/start() to no-op when
permanentlyStopped is true; or (B) make start() reinitialize inbox and
dispatchQueue (new Channel(...) instances), ensure collectorJob/dispatcherJob
are launched fresh from scope (calling runCollector/runDispatcher), and have
stop() cancel jobs and close old channels but not allow leftover closed
references (also clear collectorJob/dispatcherJob). Update checks in offer() to
guard with inbox.isClosedForSend (or permanentlyStopped) to avoid trying to send
to closed channels. Use the symbols inbox, dispatchQueue, started, stop(),
start(), offer(), collectorJob, dispatcherJob, runCollector, runDispatcher,
scope.
- Around line 59-63: The mutable var eventHandler must be made thread-safe:
replace the plain declaration with an AtomicReference<suspend (Any) -> Unit>
(initialized with the no-op) and update onEvent(handler: suspend (Any) -> Unit)
to call eventHandler.set(handler); also update any reads (e.g., in
runDispatcher() or where the handler is invoked) to use
eventHandler.get().invoke(...) (or get() first into a local val) so registration
and invocation are safely published across threads.
In
`@stream-android-core/src/main/java/io/getstream/android/core/internal/socket/StreamSocketSession.kt`:
- Around line 461-527: The subscriptionManager.forEach calls in
handleSingleCompositeEvent and handleAggregatedEvent currently ignore the Result
returned and thus drop listener exceptions; update each
subscriptionManager.forEach invocation (both the individual core dispatch in
handleSingleCompositeEvent, the core dispatch inside handleAggregatedEvent, and
the final productAggregated dispatch) to capture the Result and call onFailure
to log the error (e.g., use Result.onFailure { logger.e(it) { "Listener dispatch
failed for <core/product/aggregated> event: $event or $type" } } ), ensuring all
listener.onEvent(...) invocations have failure logging instead of being silently
discarded.
- Around line 88-95: The code creates an IllegalStateException with the raw
WebSocket payload and logs it in onMessage (see aggregator.offer(text),
IllegalStateException(...), logger.e(...), disconnect(error)), which can leak
sensitive data; change the error and log to omit the raw payload and instead
record safe metadata (e.g., payload length, message type, a short non-reversible
hash or correlation id, and a note that the payload was dropped). Update the
IllegalStateException message and the logger.e call to include only that
sanitized metadata (no raw text or tokens) before calling disconnect(error).
- Around line 236-245: The aggregator handler is registered with
aggregator.onEvent but the aggregator is never started, so its
collector/dispatcher coroutines may never run; update StreamSocketSession to
call aggregator.start() immediately after registering the onEvent handler
(before any offer calls) so events are processed; ensure you still stop/cleanup
the aggregator in the existing cleanup code and reference the existing symbols
aggregator, onEvent, handleAggregatedEvent, and handleSingleCompositeEvent when
making the change.
- Around line 530-536: cleanup() currently calls aggregator.stop() and drops the
returned Result, so any shutdown failures are invisible; update cleanup() to
capture the Result from aggregator.stop() (after healthMonitor.stop()), inspect
it and log failures via logger.e (include the Result's throwable/message and
contextual text like "[cleanup] Aggregator stop failed"), and if necessary
propagate or handle cancellation consistently with other teardown steps;
reference the cleanup() method, cleaned.compareAndSet(...),
healthMonitor.stop(), aggregator.stop(), and logger.d/logger.e when making the
change.
In
`@stream-android-core/src/test/java/io/getstream/android/core/api/StreamClientConfigFactoryTest.kt`:
- Around line 192-208: Test currently only checks aggregator exists; update it
to assert the aggregator's configured values match the custom StreamSocketConfig
by using the existing private-field test helpers: after obtaining aggregator via
(client as StreamClientImpl<*>).readPrivateField("socketSession") and then
.readPrivateField("aggregator"), read the aggregator's internal fields (e.g.,
"threshold" or "aggregationThreshold", "maxWindowMs" or
"aggregationMaxWindowMs", and "dispatchQueueCapacity" or
"aggregationDispatchQueueCapacity") using readPrivateField and add assertions
like assertEquals(20, threshold), assertEquals(200L, maxWindowMs), and
assertEquals(8, dispatchQueueCapacity) so the test verifies the custom values
instead of only presence.
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamEventAggregatorImplTest.kt`:
- Around line 532-573: The test dispatch queue fullness isn't asserted: update
the test for StreamEventAggregator (created with dispatchQueueCapacity = 1) to
inject a test logger or mock into the aggregator so you can assert a warning was
logged when the queue was full, and also assert that the number of delivered
events collected by the onEvent handler (received) is strictly less than the
number of events offered via offer (i.e., delivered < total offered) to prove
drops occurred; keep start/stop and the slow onEvent handler and use the
existing latch to ensure at least one delivery while verifying the log warning
and reduced delivery count after the bursts.
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/socket/StreamSocketSessionTest.kt`:
- Around line 916-920: Tighten the assertions in StreamSocketSessionTest to
verify the core "connection.error" is forwarded as an individual event and NOT
included inside the product aggregate: assert there is exactly one
StreamClientConnectionErrorEvent in seenEvents (or use
seenEvents.filterIsInstance<StreamClientConnectionErrorEvent>().size == 1),
assert there is at least one StreamAggregatedEvent but that none of its inner
events/buckets represent the core "connection.error" bucket (inspect the
aggregated event payloads/keys), and keep the existing Disconnected state check;
update references to seenEvents, StreamClientConnectionErrorEvent,
StreamAggregatedEvent and StreamConnectionState.Disconnected to locate and
change the test.
---
Outside diff comments:
In
`@stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt`:
- Around line 66-83: The StreamComponentProvider currently accepts a generic
StreamEventAggregator<*> which allows aggregators that emit unsupported event
types; restrict the contract to the exact composite socket event type used by
the session (or hide the aggregator behind a typed factory). Update the
StreamComponentProvider constructor parameter from StreamEventAggregator<*> to a
concrete type matching the SDK socket payload (e.g.,
StreamEventAggregator<StreamSocketEventComposite> or an interface like
StreamCompositeEventAggregator) and adjust any callers, or replace the parameter
with a factory/provider that receives the SDK parser and returns a
correctly-typed aggregator; ensure StreamSocketSession is updated to consume
only that concrete aggregator type so custom aggregators that emit non-composite
events are compile-time rejected.
In
`@stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.kt`:
- Around line 123-145: Add validation at the factory boundary by introducing a
shared helper (e.g., validateAggregationConfig(aggregationThreshold: Int,
aggregationMaxWindowMs: Long, aggregationDispatchQueueCapacity: Int)) and call
it from the StreamSocketConfig.jwt, StreamSocketConfig.anonymous and
StreamSocketConfig.custom factory methods before constructing the
StreamSocketConfig; the validator should check that aggregationThreshold > 0,
aggregationMaxWindowMs >= 0, and aggregationDispatchQueueCapacity > 0 (or
whatever project constraints apply) and throw an IllegalArgumentException with a
clear message if any value is invalid so misconfiguration fails fast at the
caller.
In
`@stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt`:
- Line 196: Update the KDoc for StreamClient.kt's constructor parameter
socketConfig to replace "batching" terminology with the new semantics: mention
aggregation/event processing (or processor/queue behaviour) instead of batching
tunables; specifically update the `@param` socketConfig text that currently
references "batching" to describe aggregation/event processing configuration
(URL, auth, timing, event aggregation/processing) and add a brief note to refer
to processor/queue docs if needed, ensuring the symbol StreamSocketConfig is
referenced by name in the KDoc.
---
Nitpick comments:
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamEventAggregatorImplTest.kt`:
- Around line 87-160: The test uses real Dispatchers, Thread.sleep and
CountDownLatch causing flakiness; convert the test function `spike triggers
aggregated delivery grouped by type` to use runTest with a
StandardTestDispatcher(testScheduler) (pass that dispatcher/scheduler as the
CoroutineScope to StreamEventAggregator), replace Thread.sleep and
CountDownLatch with test-scheduler advances (advanceTimeBy / advanceUntilIdle)
and coroutine-friendly synchronization (e.g. CompletableDeferred or assertions
after advanceUntilIdle), and remove direct use of Dispatchers.Default — modify
creation of StreamEventAggregator<TestEvent> (symbols: StreamEventAggregator,
typeExtractor, deserializer, aggregationThreshold, maxWindowMs), its
start()/stop()/offer() calls and the onEvent handler to work under runTest and
driven virtual time so the inbox processing is deterministically advanced by
testScheduler rather than sleeping or latching.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: d8c93c2f-ced1-4f19-821c-207e5c4a2a17
📒 Files selected for processing (13)
stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.ktstream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.ktstream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.ktstream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamAggregatedEvent.ktstream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamEventAggregator.ktstream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamEventAggregatorImpl.ktstream-android-core/src/main/java/io/getstream/android/core/internal/serialization/StreamCompositeEventSerializationImpl.ktstream-android-core/src/main/java/io/getstream/android/core/internal/socket/StreamSocketSession.ktstream-android-core/src/test/java/io/getstream/android/core/api/StreamClientConfigFactoryTest.ktstream-android-core/src/test/java/io/getstream/android/core/api/StreamClientFactoryTest.ktstream-android-core/src/test/java/io/getstream/android/core/api/model/config/StreamSocketConfigTest.ktstream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamEventAggregatorImplTest.ktstream-android-core/src/test/java/io/getstream/android/core/internal/socket/StreamSocketSessionTest.kt
… sanitized logging - Defensive copy in StreamAggregatedEvent (mutable lists no longer leak) - AtomicReference for eventHandler (thread-safe handler registration) - Terminal stop() with closed guard (prevent resurrection after stop) - Parameter validation in factory (require threshold/window/capacity > 0) - Sanitize socket message logging (log byte count, not raw payload) - Explicit aggregator.start() in session connect (not just auto-start) - Handle listener dispatch Result failures with onFailure logging - Log aggregator.stop() failures in cleanup - KDoc "exceed" → "reach or exceed" for threshold semantics
…, add 10K stress test - Replace Channel with StreamRestartableChannel for inbox and dispatch queue — supports stop/start lifecycle across socket reconnect cycles - Remove auto-start from offer() — session manages lifecycle explicitly - Remove terminal closed guard (no longer needed with restartable channels) - Add braces to all single-line if statements - Add 10K event stress test: 10,000 events → 200 handler calls (50x reduction), 129ms total processing time - Replace "offer returns false after stop" test with "stop then start resumes processing" test verifying restartable behavior
- All events delivered (count == offered) - Each batch ≤ threshold events - Fewer handler calls than raw events
StreamBatcher with StreamEventAggregator insteadStreamBatcher with StreamEventAggregator instead in the StreamSocketSession
- Wire logger through factory constructor (no longer nullable internal var only) - Assert custom aggregation values in config factory test (not just existence) - Assert queue-full warning logged in dispatch queue test - Tighten aggregate shape assertion in session test (verify core events excluded from product aggregate, verify exact product event contents)
Test all require() guards: aggregationThreshold, maxWindowMs, and dispatchQueueCapacity reject zero and negative values.
Replace two loose lambdas (typeExtractor + deserializer) and separate config values with a validated policy object. Policy combines behavior (type extraction, deserialization) with tuning (threshold, window, queue capacity). Validated at construction via require() — once you have a policy, it's guaranteed valid. StreamSocketConfig keeps the tuning values for product SDK configuration. The factory builds the policy internally from config + event parser. Custom aggregator injection via StreamComponentProvider bypasses the policy entirely.
Replace manual try/catch blocks with runCatchingCancellable to ensure CancellationException is always rethrown (structured concurrency) and no catch blocks are left empty. All failure paths now log via onFailure.
…external calls Collector and dispatcher loops catch ClosedReceiveChannelException explicitly for graceful shutdown logging. CancellationException propagates naturally (structured concurrency). runCatchingCancellable is used only for external code (handler, deserializer, type extractor) that can throw unexpectedly.
- offer before start returns false - double start is idempotent (no duplicate workers) - handler exception on aggregated event does not break dispatcher Line coverage: 89% → 93%, Branch: 74% → 78%, Instruction: 91% → 92%.
The impl now takes StreamEventAggregationPolicy<T> instead of 5 separate fields. Policy is the single source of truth for behavior and configuration — no unpacking at the boundary.
Fields moved from impl to policy — reflection path updated.
Remove unnecessary Thread.sleep, assert each offer succeeds, increase latch timeout from 30s to 60s for loaded CI runners.
|
gpunto
left a comment
There was a problem hiding this comment.
Looks good, just a comment on the aggregation type
| * @property events Events grouped by type string. Each list preserves arrival order. | ||
| */ | ||
| @StreamInternalApi | ||
| public class StreamAggregatedEvent<T>(events: Map<String, List<T>>) { |
There was a problem hiding this comment.
I'm not sure we can aggregate into a map at the core level since SDKs might rely on the order of events. For example, we might receive "reaction X added" -> "reaction X removed" and if we process them in the opposite order we end up in the wrong state. This can happen even if we use LinkedHashMap, for example if we receive this sequence
- reaction removed for X
- reaction added for Y
- reaction removed for Y
Insertion order would put the "reaction removed" key before "reaction added", but for Y "added" should be processed before "removed"
I think we can aggregate into a simple List, and then clients can implement some kind of transaction operation where they apply all updates sequentially but in one state update, so they still emit only once.



Goal
Replace the naive batching approach in
StreamSocketSessionwith an adaptive event aggregator that drastically reduces state/UI updates during event spikes (10K events → ~10 grouped dispatches instead of 10K individual ones).Implementation
New components:
StreamEventAggregator<T>— public interface with adaptive spike detectionStreamEventAggregatorImpl<T>— two decoupled coroutines:maxWindowMsStreamAggregatedEvent<T>— grouped events by type string, dispatched during spikesAdaptive behavior:
onEvent(event)— zero overheadStreamAggregatedEvent— one dispatch per windowWiring changes:
StreamSocketSession: replacedStreamBatcher<String>withStreamEventAggregator<*>. Core events (connection errors) handled individually even during spikes; product events wrapped inStreamAggregatedEvent<T>StreamSocketConfig: replacedbatchSize/batchInitialDelayMs/batchMaxDelayMswithaggregationThreshold/aggregationMaxWindowMs/aggregationDispatchQueueCapacityStreamComponentProvider: replacedbatcherfield witheventAggregatorStreamBatcherkept as general-purpose primitive, just removed from socket pipelineBREAKING CHANGE:
StreamSocketConfigbatch params andStreamComponentProvider.batcherreplaced.Testing
StreamEventAggregatorImpl(passthrough, spike aggregation, maxWindow, deserialization failures, lifecycle, backpressure, null types)StreamSocketSessionTestfor new aggregator wiringStreamClientFactoryTest,StreamClientConfigFactoryTest,StreamSocketConfigTestpr: core
Summary by CodeRabbit
Refactor
New Features
Breaking Changes
batchSize,batchInitialDelayMs,batchMaxDelayMsreplaced withaggregationThreshold,aggregationMaxWindowMs,aggregationDispatchQueueCapacity.batcherreplaced witheventAggregator.