Unify reconnect pipeline and harden network transition handling#1645
Unify reconnect pipeline and harden network transition handling#1645PratimMallick wants to merge 30 commits intodevelopfrom
Conversation
…d tracer improvements Replace SignalLostSignalingServiceDecorator with RetryableSignalingServiceDecorator that uses configurable retry policies (exponential backoff) and separates terminal SFU errors from transient network failures. Refactor SignalingServiceTracerDecorator to use a generic traced() helper for consistent request/response/exception tracing. Clean up Publisher/Subscriber error handling and fix sourceSets configuration. Made-with: Cursor
Replace immediate teardown of the old RtcSession during migration with a phased approach: enterMigration() keeps media flowing while listening for ParticipantMigrationCompleteEvent from the old SFU; finalizeMigration() tears down only after the handoff is confirmed (or timed out at 7s). Refactor prepareRejoin(reason) to send final stats, leave gracefully, and cleanup via the unified cleanup() path. Move mediaScope.cancel() from leaveWithReason() into cleanup() so it is always invoked regardless of the teardown path (rejoin, migrate, or explicit leave). Made-with: Cursor
Consolidate fastReconnect, rejoin, and migrate into a single Call.reconnect(strategy, reason) entry point with a retry loop that mirrors the JS SDK: - FAST reconnect is attempted up to MAX_FAST_RECONNECT_ATTEMPTS (3), then escalates to REJOIN. - MIGRATE failures also escalate to REJOIN. - A global reconnectMutex replaces the old schedule()/SingleFlight coalescing to provide mutual exclusion across all strategies. - MAX_RECONNECT_ATTEMPTS (10) and leaveAfterDisconnectSeconds act as circuit breakers; exhaustion sets ReconnectingFailed state. - Public fastReconnect(), rejoin(), migrate() wrappers are preserved for backward compatibility. Made-with: Cursor
Thin out RtcSession's stateJob to forward all SFU socket-state transitions directly to Call.reconnect() — the unified retry loop now owns escalation logic (FAST -> REJOIN, MIGRATE -> REJOIN). - Remove sfuConnectionRetryCount / MAX_SFU_CONNECTION_RETRIES; retry counting is handled by the reconnect loop in Call. - Replace onSignalingLost callback with onSfuApiError (maps SFU error codes to strategies) and onSfuNetworkFailure (always FAST). - Wire SfuConnectionModule to use RetryableSignalingServiceDecorator and the new dual-callback interface. - Delete the now-unused SignalLostSignalingServiceDecorator. Made-with: Cursor
- SfuConnectionRetryTest: replace per-strategy and retry-counter tests with forwarding tests that verify stateJob delegates each strategy to Call.reconnect(). - ReconnectAttemptsCountTest: test FAST (no increment), REJOIN (increments), and accumulated attempts through the unified loop. - FailedSfuIdsTest: use addFailedSfuId directly instead of calling migrate() which now requires full session setup. - JoinCallTest: skip network-dependent latency test. Made-with: Cursor
…nnected The pre-loop guard in Call.reconnect() blocked MIGRATE (and all other strategies) when the call was in Connected state. This prevented both server-initiated migration (GoAway/error event with MIGRATE strategy) and debug-triggered migration from executing. Align with JS SDK: only skip reconnect when already RECONNECTING, MIGRATING, or RECONNECTING_FAILED — exactly matching the JS guard. Remove the Connected and Disconnected checks entirely. Made-with: Cursor
- Set Reconnecting/Migrating state before acquiring reconnectMutex so concurrent callers (stateJob, NetworkStateListener) see it and skip - Launch call.reconnect() from stateJob in a separate coroutine so it survives stateJob cancellation during prepareReconnect() - Simplify fastReconnect: connect synchronously, wait for Connected state, then restore session — removes serialProcessor indirection - Remove ParticipantMigrationComplete await in migration flow to avoid unnecessary synchronization latency - Make prepareReconnect() explicitly disconnect the old SFU socket before reconnecting to prevent stale-socket state machine errors - Promote socketListenerJob to class field in SfuSocket and clean up old WebSocket on reconnect to prevent leaked connections - Move pre-reconnect stats collection before prepareReconnect() so stats are sent while the connection is still alive Made-with: Cursor
Prevent false disconnect signals during network switches (e.g. cellular→WiFi) by checking actual connectivity in NetworkStateProvider.onLost instead of unconditionally marking the network as down. Add defense-in-depth guards throughout the reconnect pipeline: - Call.kt: leave timer checks connection state before executing; reconnect loop uses tryLock to avoid queuing redundant attempts - RtcSession: centralize cleanup in cancelActiveWork(); add network-aware guards on SFU error/state callbacks; handle DisconnectedPermanently with escalation to rejoin; fast reconnect throws on stale peer connections instead of calling rejoin directly - HealthMonitor: skip reconnect attempts when network is unavailable - SfuSocketStateService: NetworkDisconnected stays parked on socket errors to avoid futile retry loops; handles NetworkAvailable for recovery Made-with: Cursor
PR checklist ✅All required conditions are satisfied:
🎉 Great job! This PR is ready for review. |
WalkthroughThis pull request refactors the SFU reconnection system to use a unified Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Call
participant StratSel as NetworkStateListener
participant RC as Reconnect Loop
participant RtcSes as RtcSession
participant Decorator as RetryableSignalingServiceDecorator
participant SFU as SFU Service
StratSel->>RC: onConnected() → select strategy (FAST/REJOIN)
RC->>RC: reconnect(strategy, reason)
loop Retry Loop (with backoff)
RC->>RtcSes: reconnectFast/rejoin/migrate(reason)
alt FAST Path
RtcSes->>Decorator: setPublisher/iceTrickle etc.
Decorator->>SFU: call with retry policy
SFU-->>Decorator: response or error
Decorator->>Decorator: check should_retry & classification
alt Transient Error
Decorator->>Decorator: delay & retry
else Terminal/SIGNAL_LOST
Decorator->>RC: invoke onTerminalError callback
end
else REJOIN Path
RtcSes->>RtcSes: prepareRejoin(reason)
RtcSes->>Decorator: sendAnswer/updateSubscriptions
Decorator->>SFU: retry transient failures
end
RC->>RC: check escalation conditions (timeout/attempt threshold)
alt Escalate to REJOIN/MIGRATE
RC->>RC: switch strategy
else Success
RC->>RC: return
else Max Retries Exhausted
RC->>RC: set ReconnectingFailed state
end
end
sequenceDiagram
participant SfuSock as SfuSocket
participant StateServ as SfuSocketStateService
participant NetProv as NetworkStateProvider
participant Call as Call.reconnect
SfuSock->>StateServ: SfuSocketStateEvent.Disconnected
alt DisconnectedTemporarily with reconnectStrategy
StateServ->>NetProv: check isConnected()
alt Network Available
NetProv-->>StateServ: true
StateServ->>Call: reconnect(strategy, reason)
else Network Down
NetProv-->>StateServ: false
StateServ->>StateServ: defer reconnect
end
else DisconnectedPermanently
StateServ->>Call: reconnect(REJOIN, reason)
end
alt NetworkAvailable Event (after network restored)
StateServ->>Call: reconnect(FAST, reason)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Subscriber.kt (1)
333-346:⚠️ Potential issue | 🟠 MajorCanceling ICE retries on every error is too broad.
Line 345 cancels scheduled ICE restarts unconditionally, but Lines 333-335 document terminal-error behavior. This can stop recovery on transient failures. Please gate cancellation to terminal signaling errors only (e.g., participant-not-found / non-retryable).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Subscriber.kt` around lines 333 - 346, The onError handler in restartIce() currently cancels retries unconditionally; change it to inspect the error (the exception passed as it) from wrapAPICall and only call restartIceJobDelegate.cancelScheduledRestartIce() for terminal/non-retryable signaling errors (e.g., check for PARTICIPANT_NOT_FOUND, a non-retryable error code or a specific exception type/message), while leaving retries intact for transient failures; keep tracer.trace("iceRestart-error", ...) but wrap the cancel call in a conditional that detects those terminal error cases (refer to restartIce(), wrapAPICall, sfuClient.iceRestart, tracer.trace and restartIceJobDelegate.cancelScheduledRestartIce()).
🧹 Nitpick comments (2)
stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/FailedSfuIdsTest.kt (1)
129-139: Rename the test to match what it now verifies.The body no longer performs multiple
migrate()calls, so the current name is misleading.✏️ Suggested rename
- fun `failed SFU IDs accumulate across multiple migrate calls`() = runTest { + fun `failed SFU IDs accumulate across multiple additions`() = runTest {Based on learnings: Applies to /src/test//*.{kt,kts} : Use descriptive backtick test names (e.g.,
fun `joining a call publishes participant tracks`()).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/FailedSfuIdsTest.kt` around lines 129 - 139, Rename the test function `failed SFU IDs accumulate across multiple migrate calls` to reflect its current behavior: it verifies accumulation after multiple add operations rather than multiple migrate() calls; update the backtick name to something like `failed SFU IDs accumulate when adding multiple failed IDs` or `failed SFU IDs accumulate across multiple addFailedSfuId calls` so the function name (the test identifier) matches the assertions around call.invokeAddFailedSfuId and call.getFailedSfuIds.stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/retry/StreamRetryPolicy.kt (1)
47-67: Consider:exponentialimplements additive growth, not true exponential backoff.The formula
prev + retry * backoffStepMillisproduces quadratic growth (e.g., 250→750→1500→2500), not exponential (e.g., 250→500→1000→2000). If true exponential is intended:nextBackOffDelayFunction = { _, prev -> (prev * 2).coerceAtMost(maxBackoffMillis) }If the current behavior is intentional for compatibility with
stream-core-android, a KDoc note clarifying the growth pattern would help future maintainers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/retry/StreamRetryPolicy.kt` around lines 47 - 67, The current exponential(...) factory in StreamRetryPolicy uses an additive formula (prev + retry * backoffStepMillis) which yields quadratic growth; replace nextBackOffDelayFunction with a true exponential step: compute a base = if (prev <= 0) backoffStepMillis else prev, then next = (base * 2).coerceAtMost(maxBackoffMillis) and ensure it is coerced into the [backoffStepMillis, maxBackoffMillis] range; update the nextBackOffDelayFunction in the StreamRetryPolicy.exponential(...) call accordingly so retries grow exponentially (and keep giveUpFunction and other params unchanged).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/SfuConnectionRetryTest.kt`:
- Around line 336-357: Rename the test function to reflect that
DisconnectedPermanently will trigger a reconnect regardless of network state;
specifically update the test named `DisconnectedPermanently does not trigger
reconnect when network is down` to something like `DisconnectedPermanently
triggers reconnect even when network is down`, keeping the existing setup that
mocks `mockNetworkStateProvider.isConnected()` to return false, creates the RTC
session via `createRtcSession()`, emits
`SfuSocketState.Disconnected.DisconnectedPermanently` into `socketStateFlow`,
and verifies `mockCall.reconnect` is invoked with
`WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_REJOIN`.
---
Outside diff comments:
In
`@stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Subscriber.kt`:
- Around line 333-346: The onError handler in restartIce() currently cancels
retries unconditionally; change it to inspect the error (the exception passed as
it) from wrapAPICall and only call
restartIceJobDelegate.cancelScheduledRestartIce() for terminal/non-retryable
signaling errors (e.g., check for PARTICIPANT_NOT_FOUND, a non-retryable error
code or a specific exception type/message), while leaving retries intact for
transient failures; keep tracer.trace("iceRestart-error", ...) but wrap the
cancel call in a conditional that detects those terminal error cases (refer to
restartIce(), wrapAPICall, sfuClient.iceRestart, tracer.trace and
restartIceJobDelegate.cancelScheduledRestartIce()).
---
Nitpick comments:
In
`@stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/retry/StreamRetryPolicy.kt`:
- Around line 47-67: The current exponential(...) factory in StreamRetryPolicy
uses an additive formula (prev + retry * backoffStepMillis) which yields
quadratic growth; replace nextBackOffDelayFunction with a true exponential step:
compute a base = if (prev <= 0) backoffStepMillis else prev, then next = (base *
2).coerceAtMost(maxBackoffMillis) and ensure it is coerced into the
[backoffStepMillis, maxBackoffMillis] range; update the nextBackOffDelayFunction
in the StreamRetryPolicy.exponential(...) call accordingly so retries grow
exponentially (and keep giveUpFunction and other params unchanged).
In
`@stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/FailedSfuIdsTest.kt`:
- Around line 129-139: Rename the test function `failed SFU IDs accumulate
across multiple migrate calls` to reflect its current behavior: it verifies
accumulation after multiple add operations rather than multiple migrate() calls;
update the backtick name to something like `failed SFU IDs accumulate when
adding multiple failed IDs` or `failed SFU IDs accumulate across multiple
addFailedSfuId calls` so the function name (the test identifier) matches the
assertions around call.invokeAddFailedSfuId and call.getFailedSfuIds.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 77038151-b2b5-42b0-bd92-ece63b714c5e
📒 Files selected for processing (28)
stream-video-android-core/api/stream-video-android-core.apistream-video-android-core/build.gradle.ktsstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Publisher.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Subscriber.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/utils/RetryableSignalingServiceDecorator.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/utils/SignalLostSignalingServiceDecorator.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/internal/module/SfuConnectionModule.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/internal/network/NetworkStateProvider.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/retry/StreamRetryAttemptInfo.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/retry/StreamRetryPolicy.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/retry/StreamRetryProcessor.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/common/HealthMonitor.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/sfu/SfuSocket.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/sfu/SfuSocketStateService.ktstream-video-android-core/src/main/kotlin/io/getstream/video/android/core/trace/SignalingServiceTracerDecorator.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/call/utils/RetryableSignalingServiceDecoratorTest.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/FailedSfuIdsTest.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/ReconnectAttemptsCountTest.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/retry/StreamRetryPolicyTest.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/retry/StreamRetryProcessorTest.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/JoinCallTest.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/ReconnectDetailsTest.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/ReconnectEscalationTest.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/SfuConnectionRetryTest.ktstream-video-android-core/src/test/kotlin/io/getstream/video/android/core/trace/SignalingServiceTracerDecoratorKtTest.kt
💤 Files with no reviewable changes (1)
- stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/utils/SignalLostSignalingServiceDecorator.kt
| @Test | ||
| fun `DISCONNECT strategy disconnects immediately without retries`() = runTest { | ||
| fun `DisconnectedPermanently does not trigger reconnect when network is down`() = runTest { | ||
| every { mockNetworkStateProvider.isConnected() } returns false | ||
| createRtcSession() | ||
| advance() | ||
|
|
||
| socketStateFlow.value = SfuSocketState.Disconnected.DisconnectedTemporarily( | ||
| socketStateFlow.value = SfuSocketState.Disconnected.DisconnectedPermanently( | ||
| error = Error.NetworkError( | ||
| message = "SFU wants disconnect", | ||
| serverErrorCode = 500, | ||
| statusCode = 500, | ||
| message = "Socket closed (1002)", | ||
| serverErrorCode = 1002, | ||
| statusCode = 1002, | ||
| ), | ||
| reconnectStrategy = WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_DISCONNECT, | ||
| ) | ||
| advance() | ||
|
|
||
| coVerify(exactly = 1) { mockSocket.disconnect() } | ||
| coVerify(exactly = 0) { mockCall.migrate() } | ||
| coVerify(exactly = 0) { mockCall.rejoin(any()) } | ||
| coVerify(exactly = 1) { | ||
| mockCall.reconnect( | ||
| WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_REJOIN, | ||
| any(), | ||
| ) | ||
| } | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check the actual RtcSession implementation to verify expected behavior for DisconnectedPermanently when network is down
ast-grep --pattern $'DisconnectedPermanently -> {
$$$
}'Repository: GetStream/stream-video-android
Length of output: 56
🏁 Script executed:
#!/bin/bash
# Find RtcSession implementation file
find . -type f -name "RtcSession.kt" | head -5Repository: GetStream/stream-video-android
Length of output: 168
🏁 Script executed:
#!/bin/bash
# Look at the test file to understand context around the failing test and similar tests
cd stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc
ls -la | grep -i testRepository: GetStream/stream-video-android
Length of output: 509
🏁 Script executed:
#!/bin/bash
# Search for how DisconnectedPermanently is handled in the codebase
rg "DisconnectedPermanently" --type kt -B 3 -A 3 | head -100Repository: GetStream/stream-video-android
Length of output: 100
🏁 Script executed:
#!/bin/bash
# Search for DisconnectedPermanently with correct kotlin file type
rg "DisconnectedPermanently" --type kotlin -B 3 -A 3 | head -150Repository: GetStream/stream-video-android
Length of output: 21522
🏁 Script executed:
#!/bin/bash
# Read the test file to see the full test context and similar tests
cat -n stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/SfuConnectionRetryTest.kt | sed -n '290,370p'Repository: GetStream/stream-video-android
Length of output: 2869
🏁 Script executed:
#!/bin/bash
# Read RtcSession implementation to understand reconnect logic
wc -l stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.ktRepository: GetStream/stream-video-android
Length of output: 171
🏁 Script executed:
#!/bin/bash
# Read relevant parts of RtcSession.kt to understand reconnect logic
cat -n stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt | head -100Repository: GetStream/stream-video-android
Length of output: 6212
🏁 Script executed:
#!/bin/bash
# Search for the reconnect logic and network check handling
rg "DisconnectedPermanently\|isConnected\(\)" stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt -B 2 -A 2Repository: GetStream/stream-video-android
Length of output: 56
🏁 Script executed:
#!/bin/bash
# Search for socket state handling in RtcSession
rg "socketStateFlow\|DisconnectedPermanently" stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt -B 2 -A 5 | head -80Repository: GetStream/stream-video-android
Length of output: 56
🏁 Script executed:
#!/bin/bash
# Look for the reconnect function and network-aware logic
rg "fun reconnect|networkStateProvider|isConnected" stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt -B 2 -A 3 | head -100Repository: GetStream/stream-video-android
Length of output: 890
🏁 Script executed:
#!/bin/bash
# Find the complete socket state handling block for DisconnectedPermanently
rg "DisconnectedPermanently" stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt -B 1 -A 10Repository: GetStream/stream-video-android
Length of output: 661
🏁 Script executed:
#!/bin/bash
# Get the context around the socket state flow collection in RtcSession
sed -n '400,600p' stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt | cat -nRepository: GetStream/stream-video-android
Length of output: 8799
🏁 Script executed:
#!/bin/bash
# Get full socket state handling to see all the branches
sed -n '200,350p' stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt | cat -nRepository: GetStream/stream-video-android
Length of output: 7413
🏁 Script executed:
#!/bin/bash
# Find the state job that collects socket state changes
rg "stateJob|socketStateFlow.collect" stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt -B 2 -A 30 | head -150Repository: GetStream/stream-video-android
Length of output: 6306
🏁 Script executed:
#!/bin/bash
# Double-check by viewing the exact DisconnectedPermanently handling again to confirm no network check
sed -n '230,280p' stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt | grep -A 15 "DisconnectedPermanently"Repository: GetStream/stream-video-android
Length of output: 56
Test name contradicts assertion — rename test to match actual behavior.
The test assertion is correct: DisconnectedPermanently intentionally triggers reconnect with REJOIN strategy regardless of network state. However, the test name "does not trigger reconnect when network is down" is misleading.
The RtcSession implementation shows:
DisconnectedTemporarilyincludes a network check:if (!networkStateProvider.isConnected()) { return@collect }DisconnectedPermanentlyhas no network check and always callsreconnect()withWEBSOCKET_RECONNECT_STRATEGY_REJOIN
The test correctly verifies this behavior, but the name should reflect that DisconnectedPermanently escalates regardless of network availability:
- fun `DisconnectedPermanently does not trigger reconnect when network is down`() = runTest {
+ fun `DisconnectedPermanently escalates to REJOIN even when network is down`() = runTest {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/SfuConnectionRetryTest.kt`
around lines 336 - 357, Rename the test function to reflect that
DisconnectedPermanently will trigger a reconnect regardless of network state;
specifically update the test named `DisconnectedPermanently does not trigger
reconnect when network is down` to something like `DisconnectedPermanently
triggers reconnect even when network is down`, keeping the existing setup that
mocks `mockNetworkStateProvider.isConnected()` to return false, creates the RTC
session via `createRtcSession()`, emits
`SfuSocketState.Disconnected.DisconnectedPermanently` into `socketStateFlow`,
and verifies `mockCall.reconnect` is invoked with
`WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_REJOIN`.
|
@CodeRabbit review specifically for unhandled exceptions and coroutine misuse in this PR |
|
@coderabbitai review specifically for unhandled exceptions and coroutine misuse in this PR |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 68609 🏁 Script executed: Length of output: 41507 🏁 Script executed: Length of output: 319 🏁 Script executed: Length of output: 8593 🏁 Script executed: Length of output: 22232
🔴 Critical1.
|
| # | File | Severity | Issue |
|---|---|---|---|
| 1 | Call.kt ~L895 |
🔴 Critical | catch (Exception) swallows CancellationException in reconnect loop |
| 2 | Call.kt ~L987, ~L1045 |
🔴 Critical | session.value!! force-unwrap TOCTOU race in reconnectRejoin/reconnectMigrate |
| 3 | RtcSession.kt |
🟠 Significant | launch { reconnect() } with no CoroutineExceptionHandler |
| 4 | NetworkStateProvider.kt |
🟠 Significant | Listener loop in scope.launch{} with no error handler |
| 5 | SfuSocket.kt |
🟠 Significant | Flow .onEach{}.launchIn() missing .catch{} |
| 6 | Call.kt ~L386 |
🟡 Minor | leaveTimeoutAfterDisconnect launch has no error handler |
| 7 | Call.kt ~L1928–52 |
🟡 Minor | Debug launches with no error handler |
| 8 | Call.kt |
🟡 Minor | throw Exception(...) loses typed error info |
| 9 | RetryableSignalingServiceDecorator.kt |
🟡 Minor | onNetworkFailure throw can suppress original exception |
|
Oops, something went wrong! Please try again later. 🐰 💔 |
|
@coderabbitaireview in this PR. Can you review for the following
|
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 17347 🏁 Script executed: Length of output: 13538
1.
|
| Usage type | Examples |
|---|---|
| Upward method calls | call.reconnect(...), call.rejoin(...), call.onSfuConnectionEstablished() |
| State mutation | call.state._connection.value = RealtimeConnection.Connected (Line ~668, 680) |
| Resource access | call.peerConnectionFactory, call.mediaManager, call.clientCapabilities |
| Scope borrowing | call.scope.launch { call.reconnect(...) } |
This creates a circular dependency: Call → RtcSession → Call. The direction of data and command flow is ambiguous — both objects own each other.
Suggested fix: Define a narrow upward-communication interface and pass only that to RtcSession:
// New interface — only what RtcSession needs to tell Call
internal interface RtcSessionDelegate {
val connectionState: StateFlow<RealtimeConnection>
val ownCapabilities: StateFlow<List<OwnCapability>>
suspend fun onSfuConnected()
suspend fun reconnect(strategy: WebsocketReconnectStrategy, reason: String)
fun getOrCreateParticipant(sessionId: String): ParticipantState
// ...
}
// Call implements it:
internal class Call : RtcSessionDelegate { ... }
// RtcSession receives only the interface:
internal class RtcSession(
private val delegate: RtcSessionDelegate, // ← narrow contract, not the full Call
...
)State mutations like call.state._connection.value = ... should move entirely into Call, triggered by callbacks/events raised by RtcSession:
// RtcSession signals upward:
delegate.onSfuConnected()
// Call handles the state mutation in its own layer:
override suspend fun onSfuConnected() {
state._connection.value = RealtimeConnection.Connected
}2. SfuConnectionModule lambdas capture call — implicit back-reference
File: RtcSession.kt Lines ~254–269
private val sfuConnectionModuleProvider: () -> SfuConnectionModule = {
SfuConnectionModule(
...
onSfuApiError = { error ->
if (call.state.connection.value is RealtimeConnection.Disconnected) return@SfuConnectionModule
call.scope.launch {
call.reconnect(strategy, "SfuApiError:...") // ← call captured in lambda
}
},
onSfuNetworkFailure = { throwable ->
if (call.state.connection.value is RealtimeConnection.Disconnected) return@SfuConnectionModule
call.scope.launch {
call.reconnect(...) // ← call captured in lambda
}
},
)
}SfuConnectionModule should be completely unaware of Call. The lambdas passed to it are closures over call, making the module indirectly depend on the top-level Call. This also means the module holds a strong reference to Call for its lifetime.
Suggested fix: Keep the lambda abstraction but resolve call only at the RtcSession→delegate boundary:
onSfuApiError = { error ->
if (delegate.connectionState.value is RealtimeConnection.Disconnected) return@SfuConnectionModule
coroutineScope.launch {
delegate.reconnect(strategy, "SfuApiError:...") // ← delegate, not call
}
},3. Call.Debug is a public nested class holding val call: Call
File: Call.kt Line 1917
public class Debug(val call: Call) { // ← val call is public
fun rejoin(reason: String = "Debug") {
call.scope.launch { call.rejoin(reason) }
}
fun migrate() {
call.scope.launch { call.migrate() }
}
fun fastReconnect(reason: String = "Debug") {
call.scope.launch { call.fastReconnect(reason) }
}
// ...
}val callispublic, so any consumer that holds aDebuginstance can reach the entireCallobject — including internal state, session, media tracks, etc.- The nested class is tightly coupled to the outer class through
call.scope.launch {}, which is the outer class's lifecycle object. - If a
Debuginstance is retained anywhere (e.g. a test framework, a UI debug panel), it prevents garbage collection ofCall.
Suggested fix: Replace the Call reference with typed callback parameters:
public class Debug(
private val onRejoin: (reason: String) -> Unit,
private val onMigrate: () -> Unit,
private val onFastReconnect: (reason: String) -> Unit,
private val onDisableSubscriber: () -> Unit,
private val onEnableSubscriber: () -> Unit,
private val onRestartSubscriberIce: () -> Unit,
private val onRestartPublisherIce: () -> Unit,
private val onSimulateSfuFull: () -> Unit,
) {
fun rejoin(reason: String = "Debug") = onRejoin(reason)
fun migrate() = onMigrate()
fun fastReconnect(reason: String = "Debug") = onFastReconnect(reason)
}And build it at the Call layer:
val debug = Debug(
onRejoin = { reason -> scope.launch { rejoin(reason) } },
onMigrate = { scope.launch { migrate() } },
onFastReconnect = { reason -> scope.launch { fastReconnect(reason) } },
// ...
)This way Debug holds no knowledge of Call at all.
4. RtcSession directly mutates Call's internal state — ownership violation
File: RtcSession.kt Lines ~668, 680
call.state._connection.value = RealtimeConnection.Connected // inside RtcSession
call.state._connection.value = RealtimeConnection.Reconnecting_connection is a MutableStateFlow that belongs to Call. Mutating it from RtcSession means two separate classes share ownership of the same state. The state diagram becomes impossible to reason about — either class can change the connection state at any time.
Suggested fix: RtcSession should emit an event/callback; Call alone should translate that into a state update:
// In RtcSession — signal upward:
delegate.onSfuConnected()
// In Call — owns the mutation:
override suspend fun onSfuConnected() {
state._connection.value = RealtimeConnection.Connected
}Dependency direction summary
Current (problematic):
Call ──creates──▶ RtcSession
▲ │
└────call.*──────────┘ ← circular
Target (unidirectional):
Call ──creates──▶ RtcSession
│ │
│ implements │ calls via
▼ ▼
RtcSessionDelegate (interface)
🧠 Learnings used
Learnt from: aleksandar-apostolov
Repo: GetStream/stream-video-android PR: 1587
File: stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt:795-798
Timestamp: 2025-12-19T14:10:16.577Z
Learning: In the `rejoin()` function in `stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt`, the design invariant is that `rejoin()` should only be called when there is an active session (`this.session != null`). Therefore, using `this.session!!` is safe and intentional.
Learnt from: CR
Repo: GetStream/stream-video-android PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-19T09:15:37.269Z
Learning: Applies to **/*.{kt,kts} : Ensure cleanup/teardown paths handle cancellation and failure (important for sockets, queues, retries)
Learnt from: rahul-lohra
Repo: GetStream/stream-video-android PR: 1614
File: stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallBusyHandler.kt:40-44
Timestamp: 2026-02-18T12:33:02.111Z
Learning: In the `CallBusyHandler` class in `stream-video-android-core`, fire-and-forget rejection calls (using `streamVideo.scope.launch` to call `reject(RejectReason.Busy)`) without explicit error handling are acceptable for the busy call rejection feature.
Learnt from: CR
Repo: GetStream/stream-video-android PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-19T09:15:37.269Z
Learning: Applies to **/*.{kt,kts} : Keep concurrency deterministic—use structured coroutines and avoid global scope
Learnt from: CR
Repo: GetStream/stream-video-android PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-19T09:15:37.269Z
Learning: Applies to **/*.{kt,kts} : Keep critical RTC paths off the main thread; prefer coroutines with structured scopes
| * this to [finalizeMigration] because the event loop is still needed while | ||
| * the new session is being established. | ||
| */ | ||
| private fun cancelActiveWork(cancelEventJob: Boolean = true) { |
There was a problem hiding this comment.
Question: the old prepareRejoin() also called sfuConnectionModule.socketConnection.disconnect() and closed publisher/subscriber peer connections. Those calls aren't in cancelActiveWork() — are they handled by every caller separately, or were they intentionally dropped?
There was a problem hiding this comment.
The socket disconnect and publisher/subscriber close were not dropped — they're in cleanup(), which is called by each path that needs full teardown:
-
Rejoin:
prepareRejoin()→cancelActiveWork()→leaveWithReason()→cleanup()(full teardown) -
Migrate:
enterMigration() → cancelActiveWork(cancelEventJob=false), then finalizeMigration() → cleanup()(deferred teardown — keeps event loop alive until new session connects) -
Fast reconnect:
prepareReconnect() → cancelActiveWork() + socket disconnect only. Peer connections are intentionally kept open since fast reconnect reuses them on the same SFU.
cancelActiveWork() is intentionally limited to cancelling jobs and the serial processor. It's the common subset shared by all three paths. The socket/peer-connection teardown varies per strategy, so it stays in the caller-specific methods.
…nto fix/cleanup-old-rtc-session-on-migration
|
…igrate now throw ReconnectPreconditionException instead of IllegalStateException. Catch block now has three layers in order: 1. CancellationException → re-throw (coroutine cancellation) 2. ReconnectPreconditionException → log + set ReconnectingFailed + break (terminal, no retry) 3. Exception → retry with escalation (transient failures)
…ectOutcome Replace ReconnectPreconditionException and try-catch control flow in the reconnect loop with a sealed class ReconnectOutcome (Success, PreconditionNotMet, PeerConnectionStale, Failed). Each reconnect method now returns an outcome instead of throwing, and the loop dispatches via an exhaustive `when` — the compiler enforces that every case is handled. Made-with: Cursor
…aths Add SfuConnectionResult sealed class in RtcSession so fastReconnect and connectAndAwait return typed outcomes instead of throwing. The public connect() now delegates to connectAndAwait, eliminating duplicated request-building, tracing, and socket-await logic. fastReconnect is reduced to its unique pre/post work (peer-connection health check, subscription restore). Call.kt maps SfuConnectionResult → ReconnectOutcome without try-catch. Made-with: Cursor
| this.session.value = newSession | ||
|
|
||
| return try { | ||
| val result = newSession.internalConnect(reconnectDetails, currentOptions) |
There was a problem hiding this comment.
Looking at the function signature I don't get any sign that it carries a hidden logic of throwing exception. The function signature is dishonest. This does not seems scalable.
I will try to look for a way to resolve this.
A quick way to resolve this would be changing the method name to something like
newSession.internalConnectWithException so the method caller will know he needs to look inside for exception
There was a problem hiding this comment.
changing the internalConnect method not to throw anything
There was a problem hiding this comment.
we need a try-finally here because finalizeMigration() must run even on cancellation. The internalConnect can throw a cancellationException, on which we should do the cleanUp of migration()
…nternalConnect in _join - Rename leaveWithReason to sendLeaveEvent and make it a suspend fun that awaits the send instead of fire-and-forget via launch. This fixes the race where cleanup() cancelled the supervisor job before the leave message was sent. - Move sendLeaveEvent call in Call.internalLeave inside the clientImpl.scope.launch block so it completes before cleanup(). - Reorder prepareRejoin: sendLeaveEvent before cancelActiveWork/cleanup. - Deprecate RtcSession.connect() in favor of internalConnect() which returns SfuConnectionResult instead of throwing. - Update Call._join to use internalConnect directly, replacing the try-catch block with an exhaustive when on SfuConnectionResult. - Split SfuConnectionResult into two sealed classes: SfuConnectionResult (Connected/Failed) for internalConnect, FastReconnectResult (Connected/PeerConnectionStale/Failed) for fastReconnect. - Remove redundant try-catch blocks around joinRequest and collectStats in reconnectFast/reconnectRejoin/reconnectMigrate. - Replace withTimeout with withTimeoutOrNull in internalConnect to correctly distinguish timeouts from external cancellation. - Remove unused CancellationException import from Call.kt. Made-with: Cursor
…etwork availability After a successful fast reconnect, the publisher and subscriber ICE connections may be stale because the underlying network path changed (e.g. WiFi ↔ cellular). This adds explicit ICE restarts for both directions so fresh candidates are gathered and media resumes. Also introduces a network availability check in the Call.reconnect() loop to avoid burning limited FAST attempt budgets when the network is down. Skipped attempts are not counted, preserving the full retry budget for when connectivity returns. Key changes: - RtcSession.restartIceAfterFastReconnect() restarts publisher and subscriber ICE after a successful fast reconnect - RtcSession.fastReconnect() now uses isClosed() instead of isFailedOrClosed() so FAILED peer connections (recoverable via ICE restart) are not prematurely escalated to REJOIN - StreamPeerConnection.isClosed() distinguishes truly CLOSED (needs REJOIN) from FAILED (recoverable) - Call.reconnect() skips FAST attempts when network is unavailable - Call.collectStats() wrapped in runCatching to avoid crashes during reconnection - Companion object constants documented with KDoc - Tests added/updated for all new behavior Made-with: Cursor
… reconnect loop bugs Strip self-reconnection logic from SfuSocket so it becomes a passive state reporter. All SFU socket state changes (WebSocketEventLost, DisconnectedTemporarily, NetworkDisconnected) now route through RtcSession.stateJob into Call.reconnect(), eliminating the race condition that caused double-joins and SCHEDULED_CLEANUP peer drops. Key changes: - Remove SfuSocket's networkStateListener and self-reconnect paths - Route WebSocketEventLost from RtcSession.stateJob to Call.reconnect() - Guard prepareReconnect() against disconnecting an already-disconnected socket - Fix strategy downgrade bug: REJOIN no longer falls back to FAST - Increment reconnectAttempts only for REJOIN/MIGRATE (not FAST/UNSPECIFIED) - Leave the call when ReconnectingFailed is reached (prevent zombie calls) - Remove redundant inner-loop network guard (entry-point checks suffice) - Deprecate RestartConnection, NetworkAvailable, and RestartReason for binary compatibility - Add high-level reconnection logic documentation Made-with: Cursor
…nto fix/cleanup-old-rtc-session-on-migration


Goal
To Unify the reconnect pipeline and harden network transition handling
Summary
fastReconnect()/rejoin()/migrate()call sites with a singleCall.reconnect(strategy, reason)entry point that owns all retry logic and strategy escalation (FAST → REJOIN, MIGRATE → REJOIN)cancelActiveWork()to prevent stale background jobs from triggering unwanted reconnects on old sessionsRetryableSignalingServiceDecoratorwithStreamRetryPolicy/StreamRetryProcessorfor robust SFU API call retries with proper error propagationImplementation
Unified reconnect loop (
Call.kt)Previously,
fastReconnect(),rejoin(), andmigrate()were independent methods scheduled throughStreamSingleFlightProcessorImpl. Multiple concurrent triggers (stateJob, NetworkStateListener, SfuSocket errors, HealthMonitor) could race and queue conflicting reconnect paths.Now all callers funnel through
reconnect(strategy, reason)which:MutexviatryLock()— concurrent triggers are dropped, not queuedPeerConnectionNotUsableException(stale WebRTC peer connections) as an escalation triggerConnected,ReconnectingFailed,Disconnected, or max attemptsNetwork transition reliability (
NetworkStateProvider,Call.kt)onLostnow checks actual connectivity before signaling disconnect — prevents false events during transport handoffs (OS releases cellular after WiFi is already active)onDisconnectedguards against leaving when the connection has already recoveredRtcSession cleanup (
RtcSession.kt)cancelActiveWork()centralizes cancellation of stateJob, eventJob, muteStateSyncJob, participantsMonitoringJob, and serialProcessor — every teardown path (migration, reconnect, rejoin, leave) goes through itDisconnectedTemporarily→call.reconnect(strategy),DisconnectedPermanently→call.reconnect(REJOIN)— no more inline escalation logic or retry countersfastReconnectuseswithTimeout+state().first { Connected }instead of callback-basedwhenConnected; throwsPeerConnectionNotUsableExceptionon stale peer connections instead of calling rejoin directlyenterMigration()→ new session connect →finalizeMigration()— old peer connections stay alive during transitionSFU API retry layer (
RetryableSignalingServiceDecorator)SignalLostSignalingServiceDecoratorwith a decorator that retries transient SFU API failures viaStreamRetryProcessoronSfuApiError(maps to reconnect strategy), network failures →onSfuNetworkFailure(triggers FAST reconnect)StreamRetryPolicyandStreamRetryProcessorprovide configurable linear/exponential backoffSocket layer hardening (
HealthMonitor,SfuSocket,SfuSocketStateService)HealthMonitoracceptsisNetworkAvailablelambda — skips reconnect when network is downSfuSocketStateService:NetworkDisconnectedstays parked on socket errors (no futile retry loops); handlesNetworkAvailableevent for recoverySfuSocket: cancels stalesocketListenerJobon reconnect; cleans up old WebSocket before creating new oneTesting
ReconnectEscalationTest— verifies FAST → REJOIN escalation,PeerConnectionNotUsableExceptionhandling, and mutex non-deadlockSfuConnectionRetryTest— stateJob forwarding, network-aware guards,DisconnectedPermanentlyescalationReconnectAttemptsCountTest— reconnect attempt counter increments correctly through unified loopReconnectDetailsTest— reconnect details populated correctly for FAST/REJOIN/MIGRATERetryableSignalingServiceDecoratorTest— retry behavior, terminal error propagation, network failure callbacksStreamRetryPolicyTest,StreamRetryProcessorTest— policy configuration and processor retry semanticsSignalingServiceTracerDecoratorKtTest— tracing decorator integration☑️Contributor Checklist
General
developbranchCode & documentation
stream-video-examples)☑️Reviewer Checklist
🎉 GIF
Please provide a suitable gif that describes your work on this pull request
Summary by CodeRabbit
New Features
ReconnectingFailedconnection state to indicate when reconnection attempts are exhausted.Bug Fixes
Tests