Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
6706625
feat(processing): replace StreamBatcher with StreamEventAggregator
aleksandar-apostolov Apr 22, 2026
0fb3a73
fix(processing): harden aggregator against throwing extractors/deseri…
aleksandar-apostolov Apr 22, 2026
7dae14c
fix(processing): address review feedback — thread safety, validation,…
aleksandar-apostolov Apr 22, 2026
2ce0b8b
refactor(processing): use StreamRestartableChannel, remove auto-start…
aleksandar-apostolov Apr 22, 2026
eae6cf4
test(processing): assert all three stress test guarantees
aleksandar-apostolov Apr 22, 2026
ef95b74
fix(processing): address remaining review comments
aleksandar-apostolov Apr 22, 2026
5f51987
test(processing): add factory parameter validation tests
aleksandar-apostolov Apr 22, 2026
398efc3
refactor(processing): introduce StreamEventAggregationPolicy
aleksandar-apostolov Apr 22, 2026
a069e0a
fix(processing): use runCatchingCancellable throughout aggregator
aleksandar-apostolov Apr 22, 2026
034ed06
fix(processing): explicit shutdown catch, runCatchingCancellable for …
aleksandar-apostolov Apr 22, 2026
729ff79
test(processing): close coverage gaps in StreamEventAggregatorImpl
aleksandar-apostolov Apr 22, 2026
bcb810c
refactor(processing): pass policy directly to StreamEventAggregatorImpl
aleksandar-apostolov Apr 22, 2026
843c8d3
fix(test): read policy fields through aggregator.policy in config test
aleksandar-apostolov Apr 23, 2026
8bd6aef
fix(test): make stress test resilient to slow CI
aleksandar-apostolov Apr 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
import io.getstream.android.core.api.model.connection.network.StreamNetworkState
import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleMonitor
import io.getstream.android.core.api.observers.network.StreamNetworkMonitor
import io.getstream.android.core.api.processing.StreamBatcher
import io.getstream.android.core.api.processing.StreamEventAggregationPolicy
import io.getstream.android.core.api.processing.StreamEventAggregator
import io.getstream.android.core.api.processing.StreamSerialProcessingQueue
import io.getstream.android.core.api.processing.StreamSingleFlightProcessor
import io.getstream.android.core.api.recovery.StreamConnectionRecoveryEvaluator
Expand Down Expand Up @@ -249,14 +250,7 @@
socketFactory =
components.socketFactory
?: StreamWebSocketFactory(logger = logProvider.taggedLogger("SCWebSocketFactory")),
batcher =
components.batcher
?: StreamBatcher(
scope = scope,
batchSize = socketConfig.batchSize,
initialDelayMs = socketConfig.batchInitialDelayMs,
maxDelayMs = socketConfig.batchMaxDelayMs,
),
eventAggregator = components.eventAggregator,
healthMonitor =
components.healthMonitor
?: StreamHealthMonitor(
Expand All @@ -277,7 +271,7 @@
*/
@Suppress("LongParameterList", "LongMethod")
@SuppressLint("ExposeAsStateFlow")
internal fun createStreamClientInternal(

Check warning on line 274 in stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This function has 21 parameters, which is greater than the 7 authorized.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-core-android&issues=AZ20cxAncWgsRMRVWiml&open=AZ20cxAncWgsRMRVWiml&pullRequest=57

// Android
scope: CoroutineScope,
Expand Down Expand Up @@ -320,13 +314,7 @@
connectionIdHolder: StreamConnectionIdHolder = StreamConnectionIdHolder(),
socketFactory: StreamWebSocketFactory =
StreamWebSocketFactory(logger = logProvider.taggedLogger("SCWebSocketFactory")),
batcher: StreamBatcher<String> =
StreamBatcher(
scope = scope,
batchSize = socketConfig.batchSize,
initialDelayMs = socketConfig.batchInitialDelayMs,
maxDelayMs = socketConfig.batchMaxDelayMs,
),
eventAggregator: StreamEventAggregator<*>? = null,

// Monitoring
healthMonitor: StreamHealthMonitor =
Expand Down Expand Up @@ -427,6 +415,28 @@
),
)

val eventParser =
StreamCompositeEventSerializationImpl(
internal =
serializationConfig.eventParser ?: StreamEventSerialization(compositeSerialization),
external = serializationConfig.productEventSerializers,
)

val resolvedAggregator =
eventAggregator
?: StreamEventAggregator(
scope = clientScope,
policy =
StreamEventAggregationPolicy.from(
typeExtractor = { raw -> eventParser.peekType(raw) },
deserializer = { raw -> eventParser.deserialize(raw) },
aggregationThreshold = socketConfig.aggregationThreshold,
maxWindowMs = socketConfig.aggregationMaxWindowMs,
dispatchQueueCapacity = socketConfig.aggregationDispatchQueueCapacity,
),
logger = logProvider.taggedLogger("SCEventAggregator"),
)

val mutableConnectionState = MutableStateFlow<StreamConnectionState>(StreamConnectionState.Idle)
return StreamClientImpl(
user = user,
Expand All @@ -446,15 +456,9 @@
products = products,
config = socketConfig,
jsonSerialization = compositeSerialization,
eventParser =
StreamCompositeEventSerializationImpl(
internal =
serializationConfig.eventParser
?: StreamEventSerialization(compositeSerialization),
external = serializationConfig.productEventSerializers,
),
eventParser = eventParser,
healthMonitor = healthMonitor,
batcher = batcher,
aggregator = resolvedAggregator,
internalSocket = socket,
subscriptionManager =
StreamSubscriptionManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import io.getstream.android.core.api.components.StreamAndroidComponentsProvider
import io.getstream.android.core.api.log.StreamLoggerProvider
import io.getstream.android.core.api.observers.lifecycle.StreamLifecycleMonitor
import io.getstream.android.core.api.observers.network.StreamNetworkMonitor
import io.getstream.android.core.api.processing.StreamBatcher
import io.getstream.android.core.api.processing.StreamEventAggregator
import io.getstream.android.core.api.processing.StreamSerialProcessingQueue
import io.getstream.android.core.api.processing.StreamSingleFlightProcessor
import io.getstream.android.core.api.recovery.StreamConnectionRecoveryEvaluator
Expand Down Expand Up @@ -63,7 +63,7 @@ import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
* @param tokenManager Token lifecycle manager.
* @param connectionIdHolder Connection ID storage.
* @param socketFactory WebSocket factory.
* @param batcher WebSocket message batcher.
* @param eventAggregator WebSocket event aggregator.
* @param healthMonitor Connection health monitor.
* @param networkMonitor Network connectivity monitor.
* @param lifecycleMonitor App lifecycle monitor.
Expand All @@ -80,7 +80,7 @@ public data class StreamComponentProvider(
val tokenManager: StreamTokenManager? = null,
val connectionIdHolder: StreamConnectionIdHolder? = null,
val socketFactory: StreamWebSocketFactory? = null,
val batcher: StreamBatcher<String>? = null,
val eventAggregator: StreamEventAggregator<*>? = null,
val healthMonitor: StreamHealthMonitor? = null,
val networkMonitor: StreamNetworkMonitor? = null,
val lifecycleMonitor: StreamLifecycleMonitor? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
* Configuration for the Stream WebSocket connection.
*
* Holds both **identity** (URL, API key, auth type) and **operational** tunables (health check
* timing, batching, connection timeout). Products pass this to the [StreamClient] factory to
* describe their socket.
* timing, event aggregation, connection timeout). Products pass this to the [StreamClient] factory
* to describe their socket.
*
* ### Usage
*
Expand All @@ -38,15 +38,16 @@
* clientInfoHeader = clientInfo,
* )
*
* // SFU socket — aggressive timing, no batching
* // SFU socket — aggressive timing, low aggregation threshold
* val sfuSocket = StreamSocketConfig.jwt(
* url = StreamWsUrl.fromString("wss://sfu.stream-io-api.com"),
* apiKey = apiKey,
* clientInfoHeader = clientInfo,
* healthCheckIntervalMs = 5_000,
* livenessThresholdMs = 15_000,
* connectionTimeoutMs = 2_000,
* batchSize = 1,
* aggregationThreshold = 10,
* aggregationMaxWindowMs = 200,
* )
* ```
*
Expand All @@ -58,9 +59,12 @@
* @param livenessThresholdMs Time without a health check ack before the connection is considered
* unhealthy in milliseconds.
* @param connectionTimeoutMs WebSocket connection timeout in milliseconds.
* @param batchSize Maximum number of WebSocket messages to batch before flushing.
* @param batchInitialDelayMs Initial debounce window for batching in milliseconds.
* @param batchMaxDelayMs Maximum debounce window for batching in milliseconds.
* @param aggregationThreshold Number of accumulated events that triggers aggregated delivery
* instead of individual dispatch.
* @param aggregationMaxWindowMs Maximum time the aggregator collects events before delivering. This
* is the latency ceiling in milliseconds.
* @param aggregationDispatchQueueCapacity Bounded capacity of the dispatch queue between the
* aggregator's collector and dispatcher coroutines.
*/
@Suppress("LongParameterList")
@StreamInternalApi
Expand All @@ -74,9 +78,9 @@
val healthCheckIntervalMs: Long = DEFAULT_HEALTH_INTERVAL_MS,
val livenessThresholdMs: Long = DEFAULT_LIVENESS_MS,
val connectionTimeoutMs: Long = DEFAULT_CONNECTION_TIMEOUT_MS,
val batchSize: Int = DEFAULT_BATCH_SIZE,
val batchInitialDelayMs: Long = DEFAULT_BATCH_INIT_DELAY_MS,
val batchMaxDelayMs: Long = DEFAULT_BATCH_MAX_DELAY_MS,
val aggregationThreshold: Int = DEFAULT_AGGREGATION_THRESHOLD,
val aggregationMaxWindowMs: Long = DEFAULT_AGGREGATION_MAX_WINDOW_MS,
val aggregationDispatchQueueCapacity: Int = DEFAULT_AGGREGATION_DISPATCH_QUEUE_CAPACITY,
) {
/** Default values for [StreamSocketConfig] fields. */
public companion object {
Expand All @@ -92,14 +96,14 @@
/** Default connection timeout: 10 seconds. */
public const val DEFAULT_CONNECTION_TIMEOUT_MS: Long = 10_000L

/** Default batch size: 10 messages. */
public const val DEFAULT_BATCH_SIZE: Int = 10
/** Default aggregation threshold: 50 events trigger aggregated delivery. */
public const val DEFAULT_AGGREGATION_THRESHOLD: Int = 50

/** Default initial batch delay: 100ms. */
public const val DEFAULT_BATCH_INIT_DELAY_MS: Long = 100L
/** Default aggregation max window: 500ms latency ceiling. */
public const val DEFAULT_AGGREGATION_MAX_WINDOW_MS: Long = 500L

/** Default max batch delay: 1 second. */
public const val DEFAULT_BATCH_MAX_DELAY_MS: Long = 1_000L
/** Default dispatch queue capacity: 16 items. */
public const val DEFAULT_AGGREGATION_DISPATCH_QUEUE_CAPACITY: Int = 16

/**
* Creates a JWT-based [StreamSocketConfig].
Expand All @@ -110,22 +114,22 @@
* @param healthCheckIntervalMs Interval between health check pings in milliseconds.
* @param livenessThresholdMs Liveness threshold in milliseconds.
* @param connectionTimeoutMs WebSocket connection timeout in milliseconds.
* @param batchSize Maximum batch size before flush.
* @param batchInitialDelayMs Initial debounce window in milliseconds.
* @param batchMaxDelayMs Maximum debounce window in milliseconds.
* @param aggregationThreshold Events before aggregated delivery triggers.
* @param aggregationMaxWindowMs Maximum collection window in milliseconds.
* @param aggregationDispatchQueueCapacity Dispatch queue capacity.
* @return A JWT-based [StreamSocketConfig].
*/
@Suppress("LongParameterList")
public fun jwt(

Check warning on line 123 in stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This function has 9 parameters, which is greater than the 7 authorized.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-core-android&issues=AZ20cxAEcWgsRMRVWimi&open=AZ20cxAEcWgsRMRVWimi&pullRequest=57
url: StreamWsUrl,
apiKey: StreamApiKey,
clientInfoHeader: StreamHttpClientInfoHeader,
healthCheckIntervalMs: Long = DEFAULT_HEALTH_INTERVAL_MS,
livenessThresholdMs: Long = DEFAULT_LIVENESS_MS,
connectionTimeoutMs: Long = DEFAULT_CONNECTION_TIMEOUT_MS,
batchSize: Int = DEFAULT_BATCH_SIZE,
batchInitialDelayMs: Long = DEFAULT_BATCH_INIT_DELAY_MS,
batchMaxDelayMs: Long = DEFAULT_BATCH_MAX_DELAY_MS,
aggregationThreshold: Int = DEFAULT_AGGREGATION_THRESHOLD,
aggregationMaxWindowMs: Long = DEFAULT_AGGREGATION_MAX_WINDOW_MS,
aggregationDispatchQueueCapacity: Int = DEFAULT_AGGREGATION_DISPATCH_QUEUE_CAPACITY,
): StreamSocketConfig =
StreamSocketConfig(
url = url,
Expand All @@ -135,9 +139,9 @@
healthCheckIntervalMs = healthCheckIntervalMs,
livenessThresholdMs = livenessThresholdMs,
connectionTimeoutMs = connectionTimeoutMs,
batchSize = batchSize,
batchInitialDelayMs = batchInitialDelayMs,
batchMaxDelayMs = batchMaxDelayMs,
aggregationThreshold = aggregationThreshold,
aggregationMaxWindowMs = aggregationMaxWindowMs,
aggregationDispatchQueueCapacity = aggregationDispatchQueueCapacity,
)

/**
Expand All @@ -149,22 +153,22 @@
* @param healthCheckIntervalMs Interval between health check pings in milliseconds.
* @param livenessThresholdMs Liveness threshold in milliseconds.
* @param connectionTimeoutMs WebSocket connection timeout in milliseconds.
* @param batchSize Maximum batch size before flush.
* @param batchInitialDelayMs Initial debounce window in milliseconds.
* @param batchMaxDelayMs Maximum debounce window in milliseconds.
* @param aggregationThreshold Events before aggregated delivery triggers.
* @param aggregationMaxWindowMs Maximum collection window in milliseconds.
* @param aggregationDispatchQueueCapacity Dispatch queue capacity.
* @return An anonymous [StreamSocketConfig].
*/
@Suppress("LongParameterList")
public fun anonymous(

Check warning on line 162 in stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This function has 9 parameters, which is greater than the 7 authorized.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-core-android&issues=AZ20cxAEcWgsRMRVWimj&open=AZ20cxAEcWgsRMRVWimj&pullRequest=57
url: StreamWsUrl,
apiKey: StreamApiKey,
clientInfoHeader: StreamHttpClientInfoHeader,
healthCheckIntervalMs: Long = DEFAULT_HEALTH_INTERVAL_MS,
livenessThresholdMs: Long = DEFAULT_LIVENESS_MS,
connectionTimeoutMs: Long = DEFAULT_CONNECTION_TIMEOUT_MS,
batchSize: Int = DEFAULT_BATCH_SIZE,
batchInitialDelayMs: Long = DEFAULT_BATCH_INIT_DELAY_MS,
batchMaxDelayMs: Long = DEFAULT_BATCH_MAX_DELAY_MS,
aggregationThreshold: Int = DEFAULT_AGGREGATION_THRESHOLD,
aggregationMaxWindowMs: Long = DEFAULT_AGGREGATION_MAX_WINDOW_MS,
aggregationDispatchQueueCapacity: Int = DEFAULT_AGGREGATION_DISPATCH_QUEUE_CAPACITY,
): StreamSocketConfig =
StreamSocketConfig(
url = url,
Expand All @@ -174,9 +178,9 @@
healthCheckIntervalMs = healthCheckIntervalMs,
livenessThresholdMs = livenessThresholdMs,
connectionTimeoutMs = connectionTimeoutMs,
batchSize = batchSize,
batchInitialDelayMs = batchInitialDelayMs,
batchMaxDelayMs = batchMaxDelayMs,
aggregationThreshold = aggregationThreshold,
aggregationMaxWindowMs = aggregationMaxWindowMs,
aggregationDispatchQueueCapacity = aggregationDispatchQueueCapacity,
)

/**
Expand All @@ -189,13 +193,13 @@
* @param healthCheckIntervalMs Interval between health check pings in milliseconds.
* @param livenessThresholdMs Liveness threshold in milliseconds.
* @param connectionTimeoutMs WebSocket connection timeout in milliseconds.
* @param batchSize Maximum batch size before flush.
* @param batchInitialDelayMs Initial debounce window in milliseconds.
* @param batchMaxDelayMs Maximum debounce window in milliseconds.
* @param aggregationThreshold Events before aggregated delivery triggers.
* @param aggregationMaxWindowMs Maximum collection window in milliseconds.
* @param aggregationDispatchQueueCapacity Dispatch queue capacity.
* @return A custom [StreamSocketConfig].
*/
@Suppress("LongParameterList")
public fun custom(

Check warning on line 202 in stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This function has 10 parameters, which is greater than the 7 authorized.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-core-android&issues=AZ20cxAEcWgsRMRVWimk&open=AZ20cxAEcWgsRMRVWimk&pullRequest=57
url: StreamWsUrl,
apiKey: StreamApiKey,
authType: String,
Expand All @@ -203,9 +207,9 @@
healthCheckIntervalMs: Long = DEFAULT_HEALTH_INTERVAL_MS,
livenessThresholdMs: Long = DEFAULT_LIVENESS_MS,
connectionTimeoutMs: Long = DEFAULT_CONNECTION_TIMEOUT_MS,
batchSize: Int = DEFAULT_BATCH_SIZE,
batchInitialDelayMs: Long = DEFAULT_BATCH_INIT_DELAY_MS,
batchMaxDelayMs: Long = DEFAULT_BATCH_MAX_DELAY_MS,
aggregationThreshold: Int = DEFAULT_AGGREGATION_THRESHOLD,
aggregationMaxWindowMs: Long = DEFAULT_AGGREGATION_MAX_WINDOW_MS,
aggregationDispatchQueueCapacity: Int = DEFAULT_AGGREGATION_DISPATCH_QUEUE_CAPACITY,
): StreamSocketConfig {
require(authType.isNotBlank()) { "Auth type must not be blank" }
return StreamSocketConfig(
Expand All @@ -216,9 +220,9 @@
healthCheckIntervalMs = healthCheckIntervalMs,
livenessThresholdMs = livenessThresholdMs,
connectionTimeoutMs = connectionTimeoutMs,
batchSize = batchSize,
batchInitialDelayMs = batchInitialDelayMs,
batchMaxDelayMs = batchMaxDelayMs,
aggregationThreshold = aggregationThreshold,
aggregationMaxWindowMs = aggregationMaxWindowMs,
aggregationDispatchQueueCapacity = aggregationDispatchQueueCapacity,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2014-2026 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-core-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.android.core.api.processing

import io.getstream.android.core.annotations.StreamInternalApi

/**
* A collection of events grouped by their type, produced by [StreamEventAggregator] during a
* traffic spike.
*
* When the aggregator detects high event throughput, it collects events within a time window and
* delivers them as a single [StreamAggregatedEvent] instead of dispatching each individually. This
* allows product SDKs to apply one state update and one UI recomposition per window instead of one
* per event.
*
* During normal (low) traffic, events flow through individually — this class is only used during
* spikes.
*
* ### Usage
*
* ```kotlin
* when (event) {
* is StreamAggregatedEvent<*> -> {
* event.events.forEach { (type, eventsOfType) ->
* when (type) {
* "channel.updated" -> applyLatest(eventsOfType)
* "message.new" -> processAll(eventsOfType)
* }
* }
* }
* else -> handleSingleEvent(event)
* }
* ```
*
* @param T The type of the individual events.
* @property events Events grouped by type string. Each list preserves arrival order.
*/
@StreamInternalApi
public class StreamAggregatedEvent<T>(events: Map<String, List<T>>) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we can aggregate into a map at the core level since SDKs might rely on the order of events. For example, we might receive "reaction X added" -> "reaction X removed" and if we process them in the opposite order we end up in the wrong state. This can happen even if we use LinkedHashMap, for example if we receive this sequence

  1. reaction removed for X
  2. reaction added for Y
  3. reaction removed for Y

Insertion order would put the "reaction removed" key before "reaction added", but for Y "added" should be processed before "removed"

I think we can aggregate into a simple List, and then clients can implement some kind of transaction operation where they apply all updates sequentially but in one state update, so they still emit only once.

public val events: Map<String, List<T>> = events.mapValues { (_, v) -> v.toList() }
}
Loading
Loading