Skip to content

Replace StreamBatcher with StreamEventAggregator instead in the StreamSocketSession#57

Open
aleksandar-apostolov wants to merge 14 commits intodevelopfrom
feature/event-aggregator
Open

Replace StreamBatcher with StreamEventAggregator instead in the StreamSocketSession#57
aleksandar-apostolov wants to merge 14 commits intodevelopfrom
feature/event-aggregator

Conversation

@aleksandar-apostolov
Copy link
Copy Markdown
Collaborator

@aleksandar-apostolov aleksandar-apostolov commented Apr 22, 2026

Goal

Replace the naive batching approach in StreamSocketSession with an adaptive event aggregator that drastically reduces state/UI updates during event spikes (10K events → ~10 grouped dispatches instead of 10K individual ones).

Implementation

New components:

  • StreamEventAggregator<T> — public interface with adaptive spike detection
  • StreamEventAggregatorImpl<T> — two decoupled coroutines:
    • Collector: drains inbox, groups by type when threshold exceeded, caps collection at maxWindowMs
    • Dispatcher: sequential event delivery from bounded dispatch queue
  • StreamAggregatedEvent<T> — grouped events by type string, dispatched during spikes

Adaptive behavior:

  • Low traffic (< threshold): events flow through individually via onEvent(event) — zero overhead
  • Spike (≥ threshold): events grouped by type into StreamAggregatedEvent — one dispatch per window
  • Backpressure: events accumulate naturally while dispatcher is busy; dispatch queue capped with warning logs

Wiring changes:

  • StreamSocketSession: replaced StreamBatcher<String> with StreamEventAggregator<*>. Core events (connection errors) handled individually even during spikes; product events wrapped in StreamAggregatedEvent<T>
  • StreamSocketConfig: replaced batchSize/batchInitialDelayMs/batchMaxDelayMs with aggregationThreshold/aggregationMaxWindowMs/aggregationDispatchQueueCapacity
  • StreamComponentProvider: replaced batcher field with eventAggregator
  • StreamBatcher kept as general-purpose primitive, just removed from socket pipeline

BREAKING CHANGE: StreamSocketConfig batch params and StreamComponentProvider.batcher replaced.

Testing

  • 8 new tests for StreamEventAggregatorImpl (passthrough, spike aggregation, maxWindow, deserialization failures, lifecycle, backpressure, null types)
  • Updated StreamSocketSessionTest for new aggregator wiring
  • Updated StreamClientFactoryTest, StreamClientConfigFactoryTest, StreamSocketConfigTest
  • 625/626 tests pass (1 pre-existing unrelated failure)

pr: core

Summary by CodeRabbit

  • Refactor

    • Replaced message batching with event aggregation for WebSocket message processing.
  • New Features

    • Events can now be grouped by type and dispatched in aggregated batches based on configurable thresholds and time windows.
  • Breaking Changes

    • Socket configuration parameters updated: batchSize, batchInitialDelayMs, batchMaxDelayMs replaced with aggregationThreshold, aggregationMaxWindowMs, aggregationDispatchQueueCapacity.
    • Component override property batcher replaced with eventAggregator.

Replace the batching approach in StreamSocketSession with an adaptive
event aggregator that switches between individual and aggregated
delivery based on traffic volume.

Low traffic: events pass through individually (no overhead).
Spike: events grouped by type into StreamAggregatedEvent, enabling
product SDKs to apply one state update instead of thousands.

Architecture: two decoupled coroutines (collector + dispatcher) with
configurable threshold, maxWindow ceiling, and bounded dispatch queue.

BREAKING CHANGE: StreamSocketConfig batch params replaced with
aggregation params. StreamComponentProvider.batcher replaced with
eventAggregator.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 22, 2026

Walkthrough

Replaced the library's message batching component with an event aggregation system. Added StreamEventAggregator interface and StreamEventAggregatorImpl for coroutine-driven event buffering and dispatching. Updated StreamSocketConfig parameters from batch-related to aggregation-related configuration. Modified client initialization and socket session to wire and use the new aggregator instead of batcher.

Changes

Cohort / File(s) Summary
Core API Configuration
StreamClient.kt, StreamComponentProvider.kt, StreamSocketConfig.kt
Replaced batching configuration (batchSize, batchInitialDelayMs, batchMaxDelayMs) with aggregation parameters (aggregationThreshold, aggregationMaxWindowMs, aggregationDispatchQueueCapacity). Updated client factory and component provider to accept StreamEventAggregator instead of StreamBatcher.
New Aggregation Components
StreamAggregatedEvent.kt, StreamEventAggregator.kt, StreamEventAggregatorImpl.kt
Introduced public aggregation interface with lifecycle methods (start(), offer(), onEvent(), stop()), a generic aggregated event container, and a coroutine-driven implementation with bounded buffering, threshold-based grouping, and windowing logic.
Internal Processing
StreamCompositeEventSerializationImpl.kt, StreamSocketSession.kt
Increased visibility of peekType() method. Replaced batch processing flow with aggregation flow; socket session now routes incoming WebSocket messages through aggregator.offer() and handles both individual and aggregated event deliveries.
Test Updates
StreamClientConfigFactoryTest.kt, StreamClientFactoryTest.kt, StreamSocketConfigTest.kt, StreamEventAggregatorImplTest.kt, StreamSocketSessionTest.kt
Updated all test mocking, assertions, and configuration to use StreamEventAggregator instead of StreamBatcher. Added comprehensive test suite for aggregator with scenarios covering low-traffic, spike, timeout, error handling, and queue-capacity edge cases.

Sequence Diagram

sequenceDiagram
    actor User
    participant SocketSession as Socket<br/>Session
    participant Aggregator as Event<br/>Aggregator
    participant Buffer as Buffered<br/>State
    participant Handler as Event<br/>Handler
    participant Listener as Event<br/>Listener

    User->>SocketSession: WebSocket message arrives
    SocketSession->>Aggregator: offer(raw)
    Aggregator->>Buffer: Enqueue raw event
    
    alt Low Traffic (Below Threshold)
        Buffer->>Aggregator: Deserialize single event
        Aggregator->>Handler: Individual event T
        Handler->>Listener: onEvent(T)
    else Spike (≥ Threshold)
        Buffer->>Aggregator: Deserialize & group by type
        Aggregator->>Handler: StreamAggregatedEvent<Map<Type, List<T>>>
        Handler->>Listener: onEvent(StreamAggregatedEvent)
    else Timeout (Max Window)
        Buffer->>Aggregator: maxWindowMs elapsed
        Aggregator->>Handler: Dispatch buffered events
        Handler->>Listener: onEvent(...)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~55 minutes

Poem

🐰 Hop, hop! The batches are gone, we're aggregating strong!
Events flow and buffer with care, grouped by type through the air,
Coroutines keep time with a window's sweet chime,
One aggregator to rule them all—a streaming paradigm so fine!
✨ The old batch bringer bids farewell to the Spring! 🌿

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 20.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description check ✅ Passed The description comprehensively covers all required template sections: Goal (spike reduction), Implementation (components, adaptive behavior, wiring changes, breaking changes), Testing (test counts and status), and Checklist items are addressed.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Title check ✅ Passed The title accurately describes the main change: replacing StreamBatcher with StreamEventAggregator in StreamSocketSession, which is the core objective of this PR.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/event-aggregator

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 22, 2026

PR checklist ✅

All required conditions are satisfied:

  • Title length is OK (or ignored by label).
  • At least one pr: label exists.
  • Sections ### Goal, ### Implementation, and ### Testing are filled.

🎉 Great job! This PR is ready for review.

@aleksandar-apostolov aleksandar-apostolov changed the title feat(processing): replace StreamBatcher with StreamEventAggregator Replace StreamBatcher with StreamEventAggregator Apr 22, 2026
@aleksandar-apostolov aleksandar-apostolov changed the title Replace StreamBatcher with StreamEventAggregator Replace StreamBatcher with StreamEventAggregator inestead Apr 22, 2026
@aleksandar-apostolov aleksandar-apostolov changed the title Replace StreamBatcher with StreamEventAggregator inestead Replace StreamBatcher with StreamEventAggregator instead Apr 22, 2026
…alizers

Wrap typeExtractor and deserializer calls in try-catch so a throwing
function doesn't kill the collector coroutine. Previously only
Result.failure was handled — a thrown exception would stop all event
processing.

Also adds 6 edge case tests:
- typeExtractor throws → collector survives
- deserializer throws (not Result.failure) → collector survives
- handler throws → subsequent events still delivered
- exactly-at-threshold boundary → triggers aggregation
- all events fail deserialization → batch dropped, no crash
- dispatch queue full → events dropped with warning, no crash
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt (1)

196-196: ⚠️ Potential issue | 🟡 Minor

Update stale batching terminology.

StreamSocketConfig no longer exposes batching tunables here; this KDoc should refer to aggregation/event processing instead.

As per coding guidelines, “Keep processor/queue behaviour documented via KDoc or dedicated docs when semantics evolve.”

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt`
at line 196, Update the KDoc for StreamClient.kt's constructor parameter
socketConfig to replace "batching" terminology with the new semantics: mention
aggregation/event processing (or processor/queue behaviour) instead of batching
tunables; specifically update the `@param` socketConfig text that currently
references "batching" to describe aggregation/event processing configuration
(URL, auth, timing, event aggregation/processing) and add a brief note to refer
to processor/queue docs if needed, ensuring the symbol StreamSocketConfig is
referenced by name in the KDoc.
stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt (1)

66-83: ⚠️ Potential issue | 🟠 Major

Tighten or hide the custom aggregator contract.

StreamEventAggregator<*> accepts any implementation, but StreamSocketSession only handles composite socket events and aggregated composite socket events. A custom aggregator that emits product events directly will compile but be ignored or fail at runtime. Prefer a typed factory/seam that receives the SDK parser, or document and enforce the exact emitted payload contract.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt`
around lines 66 - 83, The StreamComponentProvider currently accepts a generic
StreamEventAggregator<*> which allows aggregators that emit unsupported event
types; restrict the contract to the exact composite socket event type used by
the session (or hide the aggregator behind a typed factory). Update the
StreamComponentProvider constructor parameter from StreamEventAggregator<*> to a
concrete type matching the SDK socket payload (e.g.,
StreamEventAggregator<StreamSocketEventComposite> or an interface like
StreamCompositeEventAggregator) and adjust any callers, or replace the parameter
with a factory/provider that receives the SDK parser and returns a
correctly-typed aggregator; ensure StreamSocketSession is updated to consume
only that concrete aggregator type so custom aggregators that emit non-composite
events are compile-time rejected.
stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.kt (1)

123-145: ⚠️ Potential issue | 🟡 Minor

Validate aggregation config at the factory boundary.

These factories accept invalid aggregation values and defer failure until the socket pipeline builds the aggregator. Add shared validation here so misconfiguration fails close to the caller.

Suggested shared validation shape
         public fun jwt(
             url: StreamWsUrl,
             apiKey: StreamApiKey,
             clientInfoHeader: StreamHttpClientInfoHeader,
@@
             aggregationThreshold: Int = DEFAULT_AGGREGATION_THRESHOLD,
             aggregationMaxWindowMs: Long = DEFAULT_AGGREGATION_MAX_WINDOW_MS,
             aggregationDispatchQueueCapacity: Int = DEFAULT_AGGREGATION_DISPATCH_QUEUE_CAPACITY,
-        ): StreamSocketConfig =
-            StreamSocketConfig(
+        ): StreamSocketConfig {
+            validateAggregationConfig(
+                aggregationThreshold,
+                aggregationMaxWindowMs,
+                aggregationDispatchQueueCapacity,
+            )
+            return StreamSocketConfig(
                 url = url,
@@
                 aggregationDispatchQueueCapacity = aggregationDispatchQueueCapacity,
             )
+        }
+
+        private fun validateAggregationConfig(
+            aggregationThreshold: Int,
+            aggregationMaxWindowMs: Long,
+            aggregationDispatchQueueCapacity: Int,
+        ) {
+            require(aggregationThreshold > 0) { "aggregationThreshold must be > 0" }
+            require(aggregationMaxWindowMs > 0) { "aggregationMaxWindowMs must be > 0" }
+            require(aggregationDispatchQueueCapacity > 0) {
+                "aggregationDispatchQueueCapacity must be > 0"
+            }
+        }

Apply the same helper in anonymous and custom.

Also applies to: 162-184, 202-226

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.kt`
around lines 123 - 145, Add validation at the factory boundary by introducing a
shared helper (e.g., validateAggregationConfig(aggregationThreshold: Int,
aggregationMaxWindowMs: Long, aggregationDispatchQueueCapacity: Int)) and call
it from the StreamSocketConfig.jwt, StreamSocketConfig.anonymous and
StreamSocketConfig.custom factory methods before constructing the
StreamSocketConfig; the validator should check that aggregationThreshold > 0,
aggregationMaxWindowMs >= 0, and aggregationDispatchQueueCapacity > 0 (or
whatever project constraints apply) and throw an IllegalArgumentException with a
clear message if any value is invalid so misconfiguration fails fast at the
caller.
🧹 Nitpick comments (1)
stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamEventAggregatorImplTest.kt (1)

87-160: Use coroutines-test and virtual time instead of real dispatchers and blocking sleeps.

These test methods use Dispatchers.Default, Thread.sleep, and CountDownLatch which make tests slower and potentially flaky. Replace with runTest, StandardTestDispatcher(testScheduler), and advanceTimeBy/advanceUntilIdle for deterministic, controllable timing.

Per coding guidelines: "Unit tests live under each module's src/test/java; use MockK and coroutines-test to control timing."

Applies to lines 87–160, 455–500, 532–573.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamEventAggregatorImplTest.kt`
around lines 87 - 160, The test uses real Dispatchers, Thread.sleep and
CountDownLatch causing flakiness; convert the test function `spike triggers
aggregated delivery grouped by type` to use runTest with a
StandardTestDispatcher(testScheduler) (pass that dispatcher/scheduler as the
CoroutineScope to StreamEventAggregator), replace Thread.sleep and
CountDownLatch with test-scheduler advances (advanceTimeBy / advanceUntilIdle)
and coroutine-friendly synchronization (e.g. CompletableDeferred or assertions
after advanceUntilIdle), and remove direct use of Dispatchers.Default — modify
creation of StreamEventAggregator<TestEvent> (symbols: StreamEventAggregator,
typeExtractor, deserializer, aggregationThreshold, maxWindowMs), its
start()/stop()/offer() calls and the onEvent handler to work under runTest and
driven virtual time so the inbox processing is deterministically advanced by
testScheduler rather than sleeping or latching.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamAggregatedEvent.kt`:
- Line 52: The StreamAggregatedEvent constructor currently stores the caller's
Map and List instances directly on the public val events, allowing callers to
mutate backing collections; fix it in StreamAggregatedEvent by performing a
defensive deep copy in the constructor: create a new Map from the supplied
events where each entry's List value is copied to an immutable/defensive List
(e.g., toList()/List.copyOf or Collections.unmodifiableList) and expose that
immutable map (e.g., via toMap()/Map.copyOf or an unmodifiable wrapper) as the
public events property so no external mutations can affect the internal state.

In
`@stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamEventAggregator.kt`:
- Around line 38-42: The KDoc for StreamEventAggregator currently states
aggregation occurs when events "exceed [aggregationThreshold]" but the code and
tests aggregate at or above the threshold; update the KDoc to reflect that
behavior by changing the wording to "at or above [aggregationThreshold]" (or
equivalent) so it matches the implementation; ensure you reference the
aggregationThreshold property and StreamAggregatedEvent in the KDoc comment
above the StreamEventAggregator class/function to remove the off-by-one
ambiguity for SDK consumers.
- Around line 109-126: Validate the input parameters in the
StreamEventAggregator factory function before constructing
StreamEventAggregatorImpl: ensure aggregationThreshold > 0, maxWindowMs > 0,
dispatchQueueCapacity > 0, and inboxCapacity is either Channel.UNLIMITED or >=
1; if any check fails, throw an IllegalArgumentException with a clear message
referencing the offending parameter. Keep these checks in the public
StreamEventAggregator(...) function (the factory that calls
StreamEventAggregatorImpl) so invalid values fail fast prior to channel
construction or aggregator creation.

In
`@stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamEventAggregatorImpl.kt`:
- Line 50: Change StreamEventAggregatorImpl to accept a non-null StreamLogger
via constructor injection instead of using the nullable internal var logger:
StreamLogger?; remove or stop relying on the mutable nullable property and
update all usages (queue-full warnings, handler/deserializer error calls) to use
the constructor-provided logger. Update the factory/component provider that
creates StreamEventAggregatorImpl to pass the tagged StreamLogger instance (use
the same pattern where other components obtain a tagged logger) and ensure other
similar classes/occurrences referenced around the file (the members at the
locations corresponding to lines ~104-107) are also converted to
constructor-injected non-null loggers. Ensure logs follow the tagging guideline
and do not include secrets/tokens.
- Around line 53-87: stop() currently closes inbox and dispatchQueue but resets
started to false, so subsequent offer() calls auto-start with closed channels
and dead jobs; fix by either making stop() terminal or ensuring full restart by
recreating channels and jobs in start(). Specifically, choose one approach: (A)
make stop() terminal by setting a separate AtomicBoolean like permanentlyStopped
(set to true in stop()) and update offer()/start() to no-op when
permanentlyStopped is true; or (B) make start() reinitialize inbox and
dispatchQueue (new Channel(...) instances), ensure collectorJob/dispatcherJob
are launched fresh from scope (calling runCollector/runDispatcher), and have
stop() cancel jobs and close old channels but not allow leftover closed
references (also clear collectorJob/dispatcherJob). Update checks in offer() to
guard with inbox.isClosedForSend (or permanentlyStopped) to avoid trying to send
to closed channels. Use the symbols inbox, dispatchQueue, started, stop(),
start(), offer(), collectorJob, dispatcherJob, runCollector, runDispatcher,
scope.
- Around line 59-63: The mutable var eventHandler must be made thread-safe:
replace the plain declaration with an AtomicReference<suspend (Any) -> Unit>
(initialized with the no-op) and update onEvent(handler: suspend (Any) -> Unit)
to call eventHandler.set(handler); also update any reads (e.g., in
runDispatcher() or where the handler is invoked) to use
eventHandler.get().invoke(...) (or get() first into a local val) so registration
and invocation are safely published across threads.

In
`@stream-android-core/src/main/java/io/getstream/android/core/internal/socket/StreamSocketSession.kt`:
- Around line 461-527: The subscriptionManager.forEach calls in
handleSingleCompositeEvent and handleAggregatedEvent currently ignore the Result
returned and thus drop listener exceptions; update each
subscriptionManager.forEach invocation (both the individual core dispatch in
handleSingleCompositeEvent, the core dispatch inside handleAggregatedEvent, and
the final productAggregated dispatch) to capture the Result and call onFailure
to log the error (e.g., use Result.onFailure { logger.e(it) { "Listener dispatch
failed for <core/product/aggregated> event: $event or $type" } } ), ensuring all
listener.onEvent(...) invocations have failure logging instead of being silently
discarded.
- Around line 88-95: The code creates an IllegalStateException with the raw
WebSocket payload and logs it in onMessage (see aggregator.offer(text),
IllegalStateException(...), logger.e(...), disconnect(error)), which can leak
sensitive data; change the error and log to omit the raw payload and instead
record safe metadata (e.g., payload length, message type, a short non-reversible
hash or correlation id, and a note that the payload was dropped). Update the
IllegalStateException message and the logger.e call to include only that
sanitized metadata (no raw text or tokens) before calling disconnect(error).
- Around line 236-245: The aggregator handler is registered with
aggregator.onEvent but the aggregator is never started, so its
collector/dispatcher coroutines may never run; update StreamSocketSession to
call aggregator.start() immediately after registering the onEvent handler
(before any offer calls) so events are processed; ensure you still stop/cleanup
the aggregator in the existing cleanup code and reference the existing symbols
aggregator, onEvent, handleAggregatedEvent, and handleSingleCompositeEvent when
making the change.
- Around line 530-536: cleanup() currently calls aggregator.stop() and drops the
returned Result, so any shutdown failures are invisible; update cleanup() to
capture the Result from aggregator.stop() (after healthMonitor.stop()), inspect
it and log failures via logger.e (include the Result's throwable/message and
contextual text like "[cleanup] Aggregator stop failed"), and if necessary
propagate or handle cancellation consistently with other teardown steps;
reference the cleanup() method, cleaned.compareAndSet(...),
healthMonitor.stop(), aggregator.stop(), and logger.d/logger.e when making the
change.

In
`@stream-android-core/src/test/java/io/getstream/android/core/api/StreamClientConfigFactoryTest.kt`:
- Around line 192-208: Test currently only checks aggregator exists; update it
to assert the aggregator's configured values match the custom StreamSocketConfig
by using the existing private-field test helpers: after obtaining aggregator via
(client as StreamClientImpl<*>).readPrivateField("socketSession") and then
.readPrivateField("aggregator"), read the aggregator's internal fields (e.g.,
"threshold" or "aggregationThreshold", "maxWindowMs" or
"aggregationMaxWindowMs", and "dispatchQueueCapacity" or
"aggregationDispatchQueueCapacity") using readPrivateField and add assertions
like assertEquals(20, threshold), assertEquals(200L, maxWindowMs), and
assertEquals(8, dispatchQueueCapacity) so the test verifies the custom values
instead of only presence.

In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamEventAggregatorImplTest.kt`:
- Around line 532-573: The test dispatch queue fullness isn't asserted: update
the test for StreamEventAggregator (created with dispatchQueueCapacity = 1) to
inject a test logger or mock into the aggregator so you can assert a warning was
logged when the queue was full, and also assert that the number of delivered
events collected by the onEvent handler (received) is strictly less than the
number of events offered via offer (i.e., delivered < total offered) to prove
drops occurred; keep start/stop and the slow onEvent handler and use the
existing latch to ensure at least one delivery while verifying the log warning
and reduced delivery count after the bursts.

In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/socket/StreamSocketSessionTest.kt`:
- Around line 916-920: Tighten the assertions in StreamSocketSessionTest to
verify the core "connection.error" is forwarded as an individual event and NOT
included inside the product aggregate: assert there is exactly one
StreamClientConnectionErrorEvent in seenEvents (or use
seenEvents.filterIsInstance<StreamClientConnectionErrorEvent>().size == 1),
assert there is at least one StreamAggregatedEvent but that none of its inner
events/buckets represent the core "connection.error" bucket (inspect the
aggregated event payloads/keys), and keep the existing Disconnected state check;
update references to seenEvents, StreamClientConnectionErrorEvent,
StreamAggregatedEvent and StreamConnectionState.Disconnected to locate and
change the test.

---

Outside diff comments:
In
`@stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt`:
- Around line 66-83: The StreamComponentProvider currently accepts a generic
StreamEventAggregator<*> which allows aggregators that emit unsupported event
types; restrict the contract to the exact composite socket event type used by
the session (or hide the aggregator behind a typed factory). Update the
StreamComponentProvider constructor parameter from StreamEventAggregator<*> to a
concrete type matching the SDK socket payload (e.g.,
StreamEventAggregator<StreamSocketEventComposite> or an interface like
StreamCompositeEventAggregator) and adjust any callers, or replace the parameter
with a factory/provider that receives the SDK parser and returns a
correctly-typed aggregator; ensure StreamSocketSession is updated to consume
only that concrete aggregator type so custom aggregators that emit non-composite
events are compile-time rejected.

In
`@stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.kt`:
- Around line 123-145: Add validation at the factory boundary by introducing a
shared helper (e.g., validateAggregationConfig(aggregationThreshold: Int,
aggregationMaxWindowMs: Long, aggregationDispatchQueueCapacity: Int)) and call
it from the StreamSocketConfig.jwt, StreamSocketConfig.anonymous and
StreamSocketConfig.custom factory methods before constructing the
StreamSocketConfig; the validator should check that aggregationThreshold > 0,
aggregationMaxWindowMs >= 0, and aggregationDispatchQueueCapacity > 0 (or
whatever project constraints apply) and throw an IllegalArgumentException with a
clear message if any value is invalid so misconfiguration fails fast at the
caller.

In
`@stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt`:
- Line 196: Update the KDoc for StreamClient.kt's constructor parameter
socketConfig to replace "batching" terminology with the new semantics: mention
aggregation/event processing (or processor/queue behaviour) instead of batching
tunables; specifically update the `@param` socketConfig text that currently
references "batching" to describe aggregation/event processing configuration
(URL, auth, timing, event aggregation/processing) and add a brief note to refer
to processor/queue docs if needed, ensuring the symbol StreamSocketConfig is
referenced by name in the KDoc.

---

Nitpick comments:
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamEventAggregatorImplTest.kt`:
- Around line 87-160: The test uses real Dispatchers, Thread.sleep and
CountDownLatch causing flakiness; convert the test function `spike triggers
aggregated delivery grouped by type` to use runTest with a
StandardTestDispatcher(testScheduler) (pass that dispatcher/scheduler as the
CoroutineScope to StreamEventAggregator), replace Thread.sleep and
CountDownLatch with test-scheduler advances (advanceTimeBy / advanceUntilIdle)
and coroutine-friendly synchronization (e.g. CompletableDeferred or assertions
after advanceUntilIdle), and remove direct use of Dispatchers.Default — modify
creation of StreamEventAggregator<TestEvent> (symbols: StreamEventAggregator,
typeExtractor, deserializer, aggregationThreshold, maxWindowMs), its
start()/stop()/offer() calls and the onEvent handler to work under runTest and
driven virtual time so the inbox processing is deterministically advanced by
testScheduler rather than sleeping or latching.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: d8c93c2f-ced1-4f19-821c-207e5c4a2a17

📥 Commits

Reviewing files that changed from the base of the PR and between 332de07 and 0fb3a73.

📒 Files selected for processing (13)
  • stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt
  • stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt
  • stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.kt
  • stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamAggregatedEvent.kt
  • stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamEventAggregator.kt
  • stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamEventAggregatorImpl.kt
  • stream-android-core/src/main/java/io/getstream/android/core/internal/serialization/StreamCompositeEventSerializationImpl.kt
  • stream-android-core/src/main/java/io/getstream/android/core/internal/socket/StreamSocketSession.kt
  • stream-android-core/src/test/java/io/getstream/android/core/api/StreamClientConfigFactoryTest.kt
  • stream-android-core/src/test/java/io/getstream/android/core/api/StreamClientFactoryTest.kt
  • stream-android-core/src/test/java/io/getstream/android/core/api/model/config/StreamSocketConfigTest.kt
  • stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamEventAggregatorImplTest.kt
  • stream-android-core/src/test/java/io/getstream/android/core/internal/socket/StreamSocketSessionTest.kt

… sanitized logging

- Defensive copy in StreamAggregatedEvent (mutable lists no longer leak)
- AtomicReference for eventHandler (thread-safe handler registration)
- Terminal stop() with closed guard (prevent resurrection after stop)
- Parameter validation in factory (require threshold/window/capacity > 0)
- Sanitize socket message logging (log byte count, not raw payload)
- Explicit aggregator.start() in session connect (not just auto-start)
- Handle listener dispatch Result failures with onFailure logging
- Log aggregator.stop() failures in cleanup
- KDoc "exceed" → "reach or exceed" for threshold semantics
…, add 10K stress test

- Replace Channel with StreamRestartableChannel for inbox and dispatch
  queue — supports stop/start lifecycle across socket reconnect cycles
- Remove auto-start from offer() — session manages lifecycle explicitly
- Remove terminal closed guard (no longer needed with restartable channels)
- Add braces to all single-line if statements
- Add 10K event stress test: 10,000 events → 200 handler calls (50x
  reduction), 129ms total processing time
- Replace "offer returns false after stop" test with "stop then start
  resumes processing" test verifying restartable behavior
- All events delivered (count == offered)
- Each batch ≤ threshold events
- Fewer handler calls than raw events
@aleksandar-apostolov aleksandar-apostolov changed the title Replace StreamBatcher with StreamEventAggregator instead Replace StreamBatcher with StreamEventAggregator instead in the StreamSocketSession Apr 22, 2026
- Wire logger through factory constructor (no longer nullable internal var only)
- Assert custom aggregation values in config factory test (not just existence)
- Assert queue-full warning logged in dispatch queue test
- Tighten aggregate shape assertion in session test (verify core events
  excluded from product aggregate, verify exact product event contents)
Test all require() guards: aggregationThreshold, maxWindowMs, and
dispatchQueueCapacity reject zero and negative values.
Replace two loose lambdas (typeExtractor + deserializer) and separate
config values with a validated policy object. Policy combines behavior
(type extraction, deserialization) with tuning (threshold, window,
queue capacity). Validated at construction via require() — once you
have a policy, it's guaranteed valid.

StreamSocketConfig keeps the tuning values for product SDK
configuration. The factory builds the policy internally from config +
event parser. Custom aggregator injection via StreamComponentProvider
bypasses the policy entirely.
Replace manual try/catch blocks with runCatchingCancellable to ensure
CancellationException is always rethrown (structured concurrency) and
no catch blocks are left empty. All failure paths now log via onFailure.
…external calls

Collector and dispatcher loops catch ClosedReceiveChannelException
explicitly for graceful shutdown logging. CancellationException
propagates naturally (structured concurrency). runCatchingCancellable
is used only for external code (handler, deserializer, type extractor)
that can throw unexpectedly.
- offer before start returns false
- double start is idempotent (no duplicate workers)
- handler exception on aggregated event does not break dispatcher

Line coverage: 89% → 93%, Branch: 74% → 78%, Instruction: 91% → 92%.
The impl now takes StreamEventAggregationPolicy<T> instead of 5
separate fields. Policy is the single source of truth for behavior
and configuration — no unpacking at the boundary.
Fields moved from impl to policy — reflection path updated.
Remove unnecessary Thread.sleep, assert each offer succeeds, increase
latch timeout from 30s to 60s for loaded CI runners.
@sonarqubecloud
Copy link
Copy Markdown

Copy link
Copy Markdown
Collaborator

@gpunto gpunto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, just a comment on the aggregation type

* @property events Events grouped by type string. Each list preserves arrival order.
*/
@StreamInternalApi
public class StreamAggregatedEvent<T>(events: Map<String, List<T>>) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we can aggregate into a map at the core level since SDKs might rely on the order of events. For example, we might receive "reaction X added" -> "reaction X removed" and if we process them in the opposite order we end up in the wrong state. This can happen even if we use LinkedHashMap, for example if we receive this sequence

  1. reaction removed for X
  2. reaction added for Y
  3. reaction removed for Y

Insertion order would put the "reaction removed" key before "reaction added", but for Y "added" should be processed before "removed"

I think we can aggregate into a simple List, and then clients can implement some kind of transaction operation where they apply all updates sequentially but in one state update, so they still emit only once.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr:breaking-change Breaking change

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants