Skip to content

Unify reconnect pipeline and harden network transition handling#1645

Open
PratimMallick wants to merge 30 commits intodevelopfrom
fix/cleanup-old-rtc-session-on-migration
Open

Unify reconnect pipeline and harden network transition handling#1645
PratimMallick wants to merge 30 commits intodevelopfrom
fix/cleanup-old-rtc-session-on-migration

Conversation

@PratimMallick
Copy link
Copy Markdown
Contributor

@PratimMallick PratimMallick commented Apr 15, 2026

Goal

To Unify the reconnect pipeline and harden network transition handling

Summary

  • Replaces the scattered fastReconnect() / rejoin() / migrate() call sites with a single Call.reconnect(strategy, reason) entry point that owns all retry logic and strategy escalation (FAST → REJOIN, MIGRATE → REJOIN)
  • Fixes call drops during network transitions (cellular ↔ WiFi) caused by false disconnect signals and stale leave timers
  • Centralizes RtcSession teardown via cancelActiveWork() to prevent stale background jobs from triggering unwanted reconnects on old sessions
  • Adds RetryableSignalingServiceDecorator with StreamRetryPolicy / StreamRetryProcessor for robust SFU API call retries with proper error propagation

Implementation

Unified reconnect loop (Call.kt)

Previously, fastReconnect(), rejoin(), and migrate() were independent methods scheduled through StreamSingleFlightProcessorImpl. Multiple concurrent triggers (stateJob, NetworkStateListener, SfuSocket errors, HealthMonitor) could race and queue conflicting reconnect paths.
Now all callers funnel through reconnect(strategy, reason) which:

  • Acquires a Mutex via tryLock() — concurrent triggers are dropped, not queued
  • Runs a retry loop with automatic escalation: FAST → REJOIN after 3 attempts or deadline, MIGRATE → REJOIN on failure
  • Handles PeerConnectionNotUsableException (stale WebRTC peer connections) as an escalation trigger
  • Exits cleanly on Connected, ReconnectingFailed, Disconnected, or max attempts

Network transition reliability (NetworkStateProvider, Call.kt)

  • onLost now checks actual connectivity before signaling disconnect — prevents false events during transport handoffs (OS releases cellular after WiFi is already active)
  • Leave timer in onDisconnected guards against leaving when the connection has already recovered

RtcSession cleanup (RtcSession.kt)

  • cancelActiveWork() centralizes cancellation of stateJob, eventJob, muteStateSyncJob, participantsMonitoringJob, and serialProcessor — every teardown path (migration, reconnect, rejoin, leave) goes through it
  • stateJob is now a thin forwarding layer: DisconnectedTemporarilycall.reconnect(strategy), DisconnectedPermanentlycall.reconnect(REJOIN) — no more inline escalation logic or retry counters
  • Network-aware guards skip reconnect triggers when the network is down or the call is already disconnected
  • fastReconnect uses withTimeout + state().first { Connected } instead of callback-based whenConnected; throws PeerConnectionNotUsableException on stale peer connections instead of calling rejoin directly
  • Migration follows a phased approach: enterMigration() → new session connect → finalizeMigration() — old peer connections stay alive during transition

SFU API retry layer (RetryableSignalingServiceDecorator)

  • Replaces SignalLostSignalingServiceDecorator with a decorator that retries transient SFU API failures via StreamRetryProcessor
  • Separates error propagation: SFU response errors → onSfuApiError (maps to reconnect strategy), network failures → onSfuNetworkFailure (triggers FAST reconnect)
  • New StreamRetryPolicy and StreamRetryProcessor provide configurable linear/exponential backoff

Socket layer hardening (HealthMonitor, SfuSocket, SfuSocketStateService)

  • HealthMonitor accepts isNetworkAvailable lambda — skips reconnect when network is down
  • SfuSocketStateService: NetworkDisconnected stays parked on socket errors (no futile retry loops); handles NetworkAvailable event for recovery
  • SfuSocket: cancels stale socketListenerJob on reconnect; cleans up old WebSocket before creating new one

Testing

  • ReconnectEscalationTest — verifies FAST → REJOIN escalation, PeerConnectionNotUsableException handling, and mutex non-deadlock
  • SfuConnectionRetryTest — stateJob forwarding, network-aware guards, DisconnectedPermanently escalation
  • ReconnectAttemptsCountTest — reconnect attempt counter increments correctly through unified loop
  • ReconnectDetailsTest — reconnect details populated correctly for FAST/REJOIN/MIGRATE
  • RetryableSignalingServiceDecoratorTest — retry behavior, terminal error propagation, network failure callbacks
  • StreamRetryPolicyTest, StreamRetryProcessorTest — policy configuration and processor retry semantics
  • SignalingServiceTracerDecoratorKtTest — tracing decorator integration
  • Manual: cellular → WiFi and WiFi → cellular transitions verified stable with no call drops

☑️Contributor Checklist

General

  • I have signed the Stream CLA (required)
  • Assigned a person / code owner group (required)
  • Thread with the PR link started in a respective Slack channel (required internally)
  • PR targets the develop branch
  • PR is linked to the GitHub issue it resolves

Code & documentation

  • Changelog is updated with client-facing changes
  • New code is covered by unit tests
  • Comparison screenshots added for visual changes
  • Affected documentation updated (KDocs, docusaurus, tutorial)
  • Tutorial starter kit updated
  • Examples/guides starter kits updated (stream-video-examples)

☑️Reviewer Checklist

  • XML sample runs & works
  • Compose sample runs & works
  • Tutorial starter kit
  • Example starter kits work
  • UI Changes correct (before & after images)
  • Bugs validated (bugfixes)
  • New feature tested and works
  • Release notes and docs clearly describe changes
  • All code we touched has new or updated KDocs
  • Check the SDK Size Comparison table in the CI logs

🎉 GIF

Please provide a suitable gif that describes your work on this pull request

Summary by CodeRabbit

  • New Features

    • Added ReconnectingFailed connection state to indicate when reconnection attempts are exhausted.
  • Bug Fixes

    • Improved reconnection robustness with automatic retry logic for transient failures.
    • Enhanced network state handling to conditionally manage reconnection attempts based on connectivity.
    • Better error recovery for signaling and SFU communication failures.
  • Tests

    • Added comprehensive test coverage for reconnection escalation, retry behavior, and network state scenarios.

…d tracer improvements

Replace SignalLostSignalingServiceDecorator with RetryableSignalingServiceDecorator
that uses configurable retry policies (exponential backoff) and separates terminal
SFU errors from transient network failures. Refactor SignalingServiceTracerDecorator
to use a generic traced() helper for consistent request/response/exception tracing.
Clean up Publisher/Subscriber error handling and fix sourceSets configuration.

Made-with: Cursor
Replace immediate teardown of the old RtcSession during migration with a
phased approach: enterMigration() keeps media flowing while listening for
ParticipantMigrationCompleteEvent from the old SFU; finalizeMigration()
tears down only after the handoff is confirmed (or timed out at 7s).

Refactor prepareRejoin(reason) to send final stats, leave gracefully, and
cleanup via the unified cleanup() path. Move mediaScope.cancel() from
leaveWithReason() into cleanup() so it is always invoked regardless of
the teardown path (rejoin, migrate, or explicit leave).

Made-with: Cursor
Consolidate fastReconnect, rejoin, and migrate into a single
Call.reconnect(strategy, reason) entry point with a retry loop that
mirrors the JS SDK:

- FAST reconnect is attempted up to MAX_FAST_RECONNECT_ATTEMPTS (3),
  then escalates to REJOIN.
- MIGRATE failures also escalate to REJOIN.
- A global reconnectMutex replaces the old schedule()/SingleFlight
  coalescing to provide mutual exclusion across all strategies.
- MAX_RECONNECT_ATTEMPTS (10) and leaveAfterDisconnectSeconds act
  as circuit breakers; exhaustion sets ReconnectingFailed state.
- Public fastReconnect(), rejoin(), migrate() wrappers are preserved
  for backward compatibility.

Made-with: Cursor
Thin out RtcSession's stateJob to forward all SFU socket-state
transitions directly to Call.reconnect() — the unified retry loop now
owns escalation logic (FAST -> REJOIN, MIGRATE -> REJOIN).

- Remove sfuConnectionRetryCount / MAX_SFU_CONNECTION_RETRIES; retry
  counting is handled by the reconnect loop in Call.
- Replace onSignalingLost callback with onSfuApiError (maps SFU error
  codes to strategies) and onSfuNetworkFailure (always FAST).
- Wire SfuConnectionModule to use RetryableSignalingServiceDecorator
  and the new dual-callback interface.
- Delete the now-unused SignalLostSignalingServiceDecorator.

Made-with: Cursor
- SfuConnectionRetryTest: replace per-strategy and retry-counter tests
  with forwarding tests that verify stateJob delegates each strategy
  to Call.reconnect().
- ReconnectAttemptsCountTest: test FAST (no increment), REJOIN
  (increments), and accumulated attempts through the unified loop.
- FailedSfuIdsTest: use addFailedSfuId directly instead of calling
  migrate() which now requires full session setup.
- JoinCallTest: skip network-dependent latency test.

Made-with: Cursor
…nnected

The pre-loop guard in Call.reconnect() blocked MIGRATE (and all other
strategies) when the call was in Connected state. This prevented both
server-initiated migration (GoAway/error event with MIGRATE strategy)
and debug-triggered migration from executing.

Align with JS SDK: only skip reconnect when already RECONNECTING,
MIGRATING, or RECONNECTING_FAILED — exactly matching the JS guard.
Remove the Connected and Disconnected checks entirely.

Made-with: Cursor
- Set Reconnecting/Migrating state before acquiring reconnectMutex so
  concurrent callers (stateJob, NetworkStateListener) see it and skip
- Launch call.reconnect() from stateJob in a separate coroutine so it
  survives stateJob cancellation during prepareReconnect()
- Simplify fastReconnect: connect synchronously, wait for Connected
  state, then restore session — removes serialProcessor indirection
- Remove ParticipantMigrationComplete await in migration flow to avoid
  unnecessary synchronization latency
- Make prepareReconnect() explicitly disconnect the old SFU socket
  before reconnecting to prevent stale-socket state machine errors
- Promote socketListenerJob to class field in SfuSocket and clean up
  old WebSocket on reconnect to prevent leaked connections
- Move pre-reconnect stats collection before prepareReconnect() so
  stats are sent while the connection is still alive

Made-with: Cursor
Prevent false disconnect signals during network switches (e.g. cellular→WiFi)
by checking actual connectivity in NetworkStateProvider.onLost instead of
unconditionally marking the network as down. Add defense-in-depth guards
throughout the reconnect pipeline:

- Call.kt: leave timer checks connection state before executing; reconnect
  loop uses tryLock to avoid queuing redundant attempts
- RtcSession: centralize cleanup in cancelActiveWork(); add network-aware
  guards on SFU error/state callbacks; handle DisconnectedPermanently
  with escalation to rejoin; fast reconnect throws on stale peer connections
  instead of calling rejoin directly
- HealthMonitor: skip reconnect attempts when network is unavailable
- SfuSocketStateService: NetworkDisconnected stays parked on socket errors
  to avoid futile retry loops; handles NetworkAvailable for recovery

Made-with: Cursor
@PratimMallick PratimMallick requested a review from a team as a code owner April 15, 2026 13:02
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 15, 2026

PR checklist ✅

All required conditions are satisfied:

  • Title length is OK (or ignored by label).
  • At least one pr: label exists.
  • Sections ### Goal, ### Implementation, and ### Testing are filled.

🎉 Great job! This PR is ready for review.

@PratimMallick PratimMallick added the pr:improvement Enhances an existing feature or code label Apr 15, 2026
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 15, 2026

Walkthrough

This pull request refactors the SFU reconnection system to use a unified reconnect(strategy, reason) loop with automatic retry logic and escalation semantics, replacing individual fastReconnect(), rejoin(), and migrate() paths. It introduces retry infrastructure, new connection state ReconnectingFailed, and updates signaling-service decoration and network-state handling accordingly.

Changes

Cohort / File(s) Summary
API and State
api/stream-video-android-core.api, src/main/kotlin/.../CallState.kt
Removed CallKt.sfuReconnectTimeoutMillis, added RealtimeConnection$ReconnectingFailed public class and state object for terminal reconnection failures.
Core Reconnection Logic
src/main/kotlin/.../Call.kt
Replaced keyed scheduling with unified reconnect(strategy, reason) loop; added retry caps, escalation logic (FAST→REJOIN→MIGRATE), and ReconnectingFailed terminal state; kept public entry points as wrappers. Updated Debug.rejoin() signature to accept optional reason.
RtcSession and Connection Prep
src/main/kotlin/.../call/RtcSession.kt
Removed SFU retry state and escalation branching; replaced with SFU connection callbacks (onSfuApiError, onSfuNetworkFailure) invoking call.reconnect(...); added enterMigration(), finalizeMigration(), made prepareRejoin() and prepareReconnect() suspend; introduced PeerConnectionNotUsableException; refactored fastReconnect connection flow with timeout and subscription re-enable.
Retry Infrastructure
src/main/kotlin/.../retry/StreamRetryPolicy.kt, src/main/kotlin/.../retry/StreamRetryProcessor.kt, src/main/kotlin/.../retry/StreamRetryAttemptInfo.kt
Added new retry policy/processor framework with exponential/linear/fixed backoff factories, giveUp predicates, and a suspend-based retry loop that handles transient failures and propagates cancellation.
Signaling Service Decoration
src/main/kotlin/.../call/utils/RetryableSignalingServiceDecorator.kt, src/main/kotlin/.../call/utils/SignalLostSignalingServiceDecorator.kt
Added RetryableSignalingServiceDecorator to wrap signaling calls with automatic retry logic and error classification (terminal vs. network-failure); removed SignalLostSignalingServiceDecorator.
SFU Connection Module and Callbacks
src/main/kotlin/.../internal/module/SfuConnectionModule.kt
Updated constructor to accept onSfuApiError and onSfuNetworkFailure callbacks instead of onSignalingLost; wired RetryableSignalingServiceDecorator with error-routing logic.
SFU Socket and State Management
src/main/kotlin/.../socket/sfu/SfuSocket.kt, src/main/kotlin/.../socket/sfu/SfuSocketStateService.kt
Added persistent socketListenerJob with lifecycle cleanup; passed networkStateProvider to HealthMonitor; updated socket state transitions to handle NetworkAvailable events and route DisconnectedTemporarily/DisconnectedPermanently to call.reconnect(...); changed error logging to debug level.
Network and Health Monitoring
src/main/kotlin/.../socket/common/HealthMonitor.kt, src/main/kotlin/.../internal/network/NetworkStateProvider.kt
Added isNetworkAvailable callback to HealthMonitor to gate reconnect attempts; updated NetworkStateProvider.onLost() to delegate to notifyListenersIfNetworkStateChanged() for conditional state-change events.
Call Connection Handling
src/main/kotlin/.../call/connection/Publisher.kt, src/main/kotlin/.../call/connection/Subscriber.kt
Removed explicit no-op branches in Publisher.negotiate() error handling; added ICE-restart cancellation in Subscriber.restartIce() on signaling errors.
Tracing
src/main/kotlin/.../trace/SignalingServiceTracerDecorator.kt
Refactored per-method tracing into shared traced(...) helper; added exception-level tracing for CancellationException passthrough and other throwables; extended coverage to stopNoiseCancellation.
Build Configuration
build.gradle.kts
Restricted Kotlin services directory generation to main source set only.
Test Coverage
src/test/kotlin/.../retry/StreamRetryPolicyTest.kt, src/test/kotlin/.../retry/StreamRetryProcessorTest.kt, src/test/kotlin/.../call/utils/RetryableSignalingServiceDecoratorTest.kt, src/test/kotlin/.../rtc/ReconnectDetailsTest.kt, src/test/kotlin/.../rtc/ReconnectEscalationTest.kt, src/test/kotlin/.../rtc/SfuConnectionRetryTest.kt, src/test/kotlin/.../rtc/JoinCallTest.kt, src/test/kotlin/.../reconnect/*
Added comprehensive test suites validating retry policies, processor behavior, decorator logic, reconnection strategy escalation, and SFU connection state transitions; updated existing tests to use new unified API; marked latency test as @Ignore due to network dependency.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Call
    participant StratSel as NetworkStateListener
    participant RC as Reconnect Loop
    participant RtcSes as RtcSession
    participant Decorator as RetryableSignalingServiceDecorator
    participant SFU as SFU Service
    
    StratSel->>RC: onConnected() → select strategy (FAST/REJOIN)
    RC->>RC: reconnect(strategy, reason)
    loop Retry Loop (with backoff)
        RC->>RtcSes: reconnectFast/rejoin/migrate(reason)
        alt FAST Path
            RtcSes->>Decorator: setPublisher/iceTrickle etc.
            Decorator->>SFU: call with retry policy
            SFU-->>Decorator: response or error
            Decorator->>Decorator: check should_retry & classification
            alt Transient Error
                Decorator->>Decorator: delay & retry
            else Terminal/SIGNAL_LOST
                Decorator->>RC: invoke onTerminalError callback
            end
        else REJOIN Path
            RtcSes->>RtcSes: prepareRejoin(reason)
            RtcSes->>Decorator: sendAnswer/updateSubscriptions
            Decorator->>SFU: retry transient failures
        end
        RC->>RC: check escalation conditions (timeout/attempt threshold)
        alt Escalate to REJOIN/MIGRATE
            RC->>RC: switch strategy
        else Success
            RC->>RC: return
        else Max Retries Exhausted
            RC->>RC: set ReconnectingFailed state
        end
    end
Loading
sequenceDiagram
    participant SfuSock as SfuSocket
    participant StateServ as SfuSocketStateService
    participant NetProv as NetworkStateProvider
    participant Call as Call.reconnect
    
    SfuSock->>StateServ: SfuSocketStateEvent.Disconnected
    alt DisconnectedTemporarily with reconnectStrategy
        StateServ->>NetProv: check isConnected()
        alt Network Available
            NetProv-->>StateServ: true
            StateServ->>Call: reconnect(strategy, reason)
        else Network Down
            NetProv-->>StateServ: false
            StateServ->>StateServ: defer reconnect
        end
    else DisconnectedPermanently
        StateServ->>Call: reconnect(REJOIN, reason)
    end
    
    alt NetworkAvailable Event (after network restored)
        StateServ->>Call: reconnect(FAST, reason)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐰 Through networks lost and signals crossed,

Our fluffy retry loop knows no cost—

With exponential hops and escalation bold,

We reconnect when storms unfold!

FAST then REJOIN, the strategy dance,

No dropped call gets a second chance! 🎯

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 19.01% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description check ✅ Passed The PR description covers Goal, Implementation, Testing, and checklist sections, though some checklist items remain unchecked and UI Changes section is empty (expected for non-UI changes).
Title check ✅ Passed The PR title clearly summarizes the main architectural change: unifying the reconnect pipeline and hardening network transition handling across the SDK.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/cleanup-old-rtc-session-on-migration

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Subscriber.kt (1)

333-346: ⚠️ Potential issue | 🟠 Major

Canceling ICE retries on every error is too broad.

Line 345 cancels scheduled ICE restarts unconditionally, but Lines 333-335 document terminal-error behavior. This can stop recovery on transient failures. Please gate cancellation to terminal signaling errors only (e.g., participant-not-found / non-retryable).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Subscriber.kt`
around lines 333 - 346, The onError handler in restartIce() currently cancels
retries unconditionally; change it to inspect the error (the exception passed as
it) from wrapAPICall and only call
restartIceJobDelegate.cancelScheduledRestartIce() for terminal/non-retryable
signaling errors (e.g., check for PARTICIPANT_NOT_FOUND, a non-retryable error
code or a specific exception type/message), while leaving retries intact for
transient failures; keep tracer.trace("iceRestart-error", ...) but wrap the
cancel call in a conditional that detects those terminal error cases (refer to
restartIce(), wrapAPICall, sfuClient.iceRestart, tracer.trace and
restartIceJobDelegate.cancelScheduledRestartIce()).
🧹 Nitpick comments (2)
stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/FailedSfuIdsTest.kt (1)

129-139: Rename the test to match what it now verifies.

The body no longer performs multiple migrate() calls, so the current name is misleading.

✏️ Suggested rename
-    fun `failed SFU IDs accumulate across multiple migrate calls`() = runTest {
+    fun `failed SFU IDs accumulate across multiple additions`() = runTest {

Based on learnings: Applies to /src/test//*.{kt,kts} : Use descriptive backtick test names (e.g., fun `joining a call publishes participant tracks`()).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/FailedSfuIdsTest.kt`
around lines 129 - 139, Rename the test function `failed SFU IDs accumulate
across multiple migrate calls` to reflect its current behavior: it verifies
accumulation after multiple add operations rather than multiple migrate() calls;
update the backtick name to something like `failed SFU IDs accumulate when
adding multiple failed IDs` or `failed SFU IDs accumulate across multiple
addFailedSfuId calls` so the function name (the test identifier) matches the
assertions around call.invokeAddFailedSfuId and call.getFailedSfuIds.
stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/retry/StreamRetryPolicy.kt (1)

47-67: Consider: exponential implements additive growth, not true exponential backoff.

The formula prev + retry * backoffStepMillis produces quadratic growth (e.g., 250→750→1500→2500), not exponential (e.g., 250→500→1000→2000). If true exponential is intended:

nextBackOffDelayFunction = { _, prev ->
    (prev * 2).coerceAtMost(maxBackoffMillis)
}

If the current behavior is intentional for compatibility with stream-core-android, a KDoc note clarifying the growth pattern would help future maintainers.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/retry/StreamRetryPolicy.kt`
around lines 47 - 67, The current exponential(...) factory in StreamRetryPolicy
uses an additive formula (prev + retry * backoffStepMillis) which yields
quadratic growth; replace nextBackOffDelayFunction with a true exponential step:
compute a base = if (prev <= 0) backoffStepMillis else prev, then next = (base *
2).coerceAtMost(maxBackoffMillis) and ensure it is coerced into the
[backoffStepMillis, maxBackoffMillis] range; update the nextBackOffDelayFunction
in the StreamRetryPolicy.exponential(...) call accordingly so retries grow
exponentially (and keep giveUpFunction and other params unchanged).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/SfuConnectionRetryTest.kt`:
- Around line 336-357: Rename the test function to reflect that
DisconnectedPermanently will trigger a reconnect regardless of network state;
specifically update the test named `DisconnectedPermanently does not trigger
reconnect when network is down` to something like `DisconnectedPermanently
triggers reconnect even when network is down`, keeping the existing setup that
mocks `mockNetworkStateProvider.isConnected()` to return false, creates the RTC
session via `createRtcSession()`, emits
`SfuSocketState.Disconnected.DisconnectedPermanently` into `socketStateFlow`,
and verifies `mockCall.reconnect` is invoked with
`WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_REJOIN`.

---

Outside diff comments:
In
`@stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Subscriber.kt`:
- Around line 333-346: The onError handler in restartIce() currently cancels
retries unconditionally; change it to inspect the error (the exception passed as
it) from wrapAPICall and only call
restartIceJobDelegate.cancelScheduledRestartIce() for terminal/non-retryable
signaling errors (e.g., check for PARTICIPANT_NOT_FOUND, a non-retryable error
code or a specific exception type/message), while leaving retries intact for
transient failures; keep tracer.trace("iceRestart-error", ...) but wrap the
cancel call in a conditional that detects those terminal error cases (refer to
restartIce(), wrapAPICall, sfuClient.iceRestart, tracer.trace and
restartIceJobDelegate.cancelScheduledRestartIce()).

---

Nitpick comments:
In
`@stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/retry/StreamRetryPolicy.kt`:
- Around line 47-67: The current exponential(...) factory in StreamRetryPolicy
uses an additive formula (prev + retry * backoffStepMillis) which yields
quadratic growth; replace nextBackOffDelayFunction with a true exponential step:
compute a base = if (prev <= 0) backoffStepMillis else prev, then next = (base *
2).coerceAtMost(maxBackoffMillis) and ensure it is coerced into the
[backoffStepMillis, maxBackoffMillis] range; update the nextBackOffDelayFunction
in the StreamRetryPolicy.exponential(...) call accordingly so retries grow
exponentially (and keep giveUpFunction and other params unchanged).

In
`@stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/FailedSfuIdsTest.kt`:
- Around line 129-139: Rename the test function `failed SFU IDs accumulate
across multiple migrate calls` to reflect its current behavior: it verifies
accumulation after multiple add operations rather than multiple migrate() calls;
update the backtick name to something like `failed SFU IDs accumulate when
adding multiple failed IDs` or `failed SFU IDs accumulate across multiple
addFailedSfuId calls` so the function name (the test identifier) matches the
assertions around call.invokeAddFailedSfuId and call.getFailedSfuIds.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 77038151-b2b5-42b0-bd92-ece63b714c5e

📥 Commits

Reviewing files that changed from the base of the PR and between 14e119a and 27a3bbf.

📒 Files selected for processing (28)
  • stream-video-android-core/api/stream-video-android-core.api
  • stream-video-android-core/build.gradle.kts
  • stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt
  • stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt
  • stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt
  • stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Publisher.kt
  • stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Subscriber.kt
  • stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/utils/RetryableSignalingServiceDecorator.kt
  • stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/utils/SignalLostSignalingServiceDecorator.kt
  • stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/internal/module/SfuConnectionModule.kt
  • stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/internal/network/NetworkStateProvider.kt
  • stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/retry/StreamRetryAttemptInfo.kt
  • stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/retry/StreamRetryPolicy.kt
  • stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/retry/StreamRetryProcessor.kt
  • stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/common/HealthMonitor.kt
  • stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/sfu/SfuSocket.kt
  • stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/sfu/SfuSocketStateService.kt
  • stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/trace/SignalingServiceTracerDecorator.kt
  • stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/call/utils/RetryableSignalingServiceDecoratorTest.kt
  • stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/FailedSfuIdsTest.kt
  • stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/ReconnectAttemptsCountTest.kt
  • stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/retry/StreamRetryPolicyTest.kt
  • stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/retry/StreamRetryProcessorTest.kt
  • stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/JoinCallTest.kt
  • stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/ReconnectDetailsTest.kt
  • stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/ReconnectEscalationTest.kt
  • stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/SfuConnectionRetryTest.kt
  • stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/trace/SignalingServiceTracerDecoratorKtTest.kt
💤 Files with no reviewable changes (1)
  • stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/utils/SignalLostSignalingServiceDecorator.kt

Comment on lines 336 to 357
@Test
fun `DISCONNECT strategy disconnects immediately without retries`() = runTest {
fun `DisconnectedPermanently does not trigger reconnect when network is down`() = runTest {
every { mockNetworkStateProvider.isConnected() } returns false
createRtcSession()
advance()

socketStateFlow.value = SfuSocketState.Disconnected.DisconnectedTemporarily(
socketStateFlow.value = SfuSocketState.Disconnected.DisconnectedPermanently(
error = Error.NetworkError(
message = "SFU wants disconnect",
serverErrorCode = 500,
statusCode = 500,
message = "Socket closed (1002)",
serverErrorCode = 1002,
statusCode = 1002,
),
reconnectStrategy = WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_DISCONNECT,
)
advance()

coVerify(exactly = 1) { mockSocket.disconnect() }
coVerify(exactly = 0) { mockCall.migrate() }
coVerify(exactly = 0) { mockCall.rejoin(any()) }
coVerify(exactly = 1) {
mockCall.reconnect(
WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_REJOIN,
any(),
)
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check the actual RtcSession implementation to verify expected behavior for DisconnectedPermanently when network is down
ast-grep --pattern $'DisconnectedPermanently -> {
  $$$
}'

Repository: GetStream/stream-video-android

Length of output: 56


🏁 Script executed:

#!/bin/bash
# Find RtcSession implementation file
find . -type f -name "RtcSession.kt" | head -5

Repository: GetStream/stream-video-android

Length of output: 168


🏁 Script executed:

#!/bin/bash
# Look at the test file to understand context around the failing test and similar tests
cd stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc
ls -la | grep -i test

Repository: GetStream/stream-video-android

Length of output: 509


🏁 Script executed:

#!/bin/bash
# Search for how DisconnectedPermanently is handled in the codebase
rg "DisconnectedPermanently" --type kt -B 3 -A 3 | head -100

Repository: GetStream/stream-video-android

Length of output: 100


🏁 Script executed:

#!/bin/bash
# Search for DisconnectedPermanently with correct kotlin file type
rg "DisconnectedPermanently" --type kotlin -B 3 -A 3 | head -150

Repository: GetStream/stream-video-android

Length of output: 21522


🏁 Script executed:

#!/bin/bash
# Read the test file to see the full test context and similar tests
cat -n stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/SfuConnectionRetryTest.kt | sed -n '290,370p'

Repository: GetStream/stream-video-android

Length of output: 2869


🏁 Script executed:

#!/bin/bash
# Read RtcSession implementation to understand reconnect logic
wc -l stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt

Repository: GetStream/stream-video-android

Length of output: 171


🏁 Script executed:

#!/bin/bash
# Read relevant parts of RtcSession.kt to understand reconnect logic
cat -n stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt | head -100

Repository: GetStream/stream-video-android

Length of output: 6212


🏁 Script executed:

#!/bin/bash
# Search for the reconnect logic and network check handling
rg "DisconnectedPermanently\|isConnected\(\)" stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt -B 2 -A 2

Repository: GetStream/stream-video-android

Length of output: 56


🏁 Script executed:

#!/bin/bash
# Search for socket state handling in RtcSession
rg "socketStateFlow\|DisconnectedPermanently" stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt -B 2 -A 5 | head -80

Repository: GetStream/stream-video-android

Length of output: 56


🏁 Script executed:

#!/bin/bash
# Look for the reconnect function and network-aware logic
rg "fun reconnect|networkStateProvider|isConnected" stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt -B 2 -A 3 | head -100

Repository: GetStream/stream-video-android

Length of output: 890


🏁 Script executed:

#!/bin/bash
# Find the complete socket state handling block for DisconnectedPermanently
rg "DisconnectedPermanently" stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt -B 1 -A 10

Repository: GetStream/stream-video-android

Length of output: 661


🏁 Script executed:

#!/bin/bash
# Get the context around the socket state flow collection in RtcSession
sed -n '400,600p' stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt | cat -n

Repository: GetStream/stream-video-android

Length of output: 8799


🏁 Script executed:

#!/bin/bash
# Get full socket state handling to see all the branches
sed -n '200,350p' stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt | cat -n

Repository: GetStream/stream-video-android

Length of output: 7413


🏁 Script executed:

#!/bin/bash
# Find the state job that collects socket state changes
rg "stateJob|socketStateFlow.collect" stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt -B 2 -A 30 | head -150

Repository: GetStream/stream-video-android

Length of output: 6306


🏁 Script executed:

#!/bin/bash
# Double-check by viewing the exact DisconnectedPermanently handling again to confirm no network check
sed -n '230,280p' stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt | grep -A 15 "DisconnectedPermanently"

Repository: GetStream/stream-video-android

Length of output: 56


Test name contradicts assertion — rename test to match actual behavior.

The test assertion is correct: DisconnectedPermanently intentionally triggers reconnect with REJOIN strategy regardless of network state. However, the test name "does not trigger reconnect when network is down" is misleading.

The RtcSession implementation shows:

  • DisconnectedTemporarily includes a network check: if (!networkStateProvider.isConnected()) { return@collect }
  • DisconnectedPermanently has no network check and always calls reconnect() with WEBSOCKET_RECONNECT_STRATEGY_REJOIN

The test correctly verifies this behavior, but the name should reflect that DisconnectedPermanently escalates regardless of network availability:

-    fun `DisconnectedPermanently does not trigger reconnect when network is down`() = runTest {
+    fun `DisconnectedPermanently escalates to REJOIN even when network is down`() = runTest {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/rtc/SfuConnectionRetryTest.kt`
around lines 336 - 357, Rename the test function to reflect that
DisconnectedPermanently will trigger a reconnect regardless of network state;
specifically update the test named `DisconnectedPermanently does not trigger
reconnect when network is down` to something like `DisconnectedPermanently
triggers reconnect even when network is down`, keeping the existing setup that
mocks `mockNetworkStateProvider.isConnected()` to return false, creates the RTC
session via `createRtcSession()`, emits
`SfuSocketState.Disconnected.DisconnectedPermanently` into `socketStateFlow`,
and verifies `mockCall.reconnect` is invoked with
`WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_REJOIN`.

Comment thread stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt Outdated
@rahul-lohra
Copy link
Copy Markdown
Contributor

@CodeRabbit review specifically for unhandled exceptions and coroutine misuse in this PR

@rahul-lohra
Copy link
Copy Markdown
Contributor

@coderabbitai review specifically for unhandled exceptions and coroutine misuse in this PR

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 15, 2026

Tip

For best results, initiate chat on the files or code changes.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Read the main files of interest for exception/coroutine analysis

echo "=== Call.kt ==="
fd -e kt "Call.kt" --full-path --exclude "test" | grep -v "Test" | head -5
fd -e kt "Call.kt" --full-path | grep -v "Test" | xargs cat 2>/dev/null | head -600

echo ""
echo "=== RtcSession.kt ==="
fd -e kt "RtcSession.kt" | grep -v "Test" | xargs cat 2>/dev/null | head -500

echo ""
echo "=== RetryableSignalingServiceDecorator.kt ==="
fd -e kt "RetryableSignalingServiceDecorator.kt" | xargs cat 2>/dev/null

echo ""
echo "=== StreamRetryProcessor.kt ==="
fd -e kt "StreamRetryProcessor.kt" | grep -v "Test" | xargs cat 2>/dev/null

echo ""
echo "=== HealthMonitor.kt ==="
fd -e kt "HealthMonitor.kt" | grep -v "Test" | xargs cat 2>/dev/null

Length of output: 68609


🏁 Script executed:

#!/bin/bash
echo "=== SfuSocket.kt ==="
fd -e kt "SfuSocket.kt" | grep -v "Test" | xargs cat 2>/dev/null

echo ""
echo "=== SfuSocketStateService.kt ==="
fd -e kt "SfuSocketStateService.kt" | grep -v "Test" | xargs cat 2>/dev/null

echo ""
echo "=== NetworkStateProvider.kt ==="
fd -e kt "NetworkStateProvider.kt" | grep -v "Test" | xargs cat 2>/dev/null

Length of output: 41507


🏁 Script executed:

#!/bin/bash
# Get the reconnect-specific parts of Call.kt
fd -e kt -p "stream-video-android-core/src/main/kotlin" "Call.kt" | xargs grep -n "reconnect\|Mutex\|mutex\|tryLock\|unlock\|launch\|SupervisorJob\|ExceptionHandler\|withTimeout\|leave\|RECONNECT" 2>/dev/null | head -150

echo ""
echo "=== Call.kt reconnect section ==="
fd -e kt -p "stream-video-android-core/src/main/kotlin" "Call.kt" | xargs sed -n '1,50p' 2>/dev/null
# Try to extract the reconnect function
fd -e kt -p "stream-video-android-core/src/main/kotlin" "Call.kt" | xargs grep -n "fun reconnect\|private.*reconnect\|internal.*reconnect\|Mutex\|tryLock\|MAX_FAST\|MAX_RECONNECT\|RECONNECT_DELAY\|reconnectMutex\|sfuReconnect\|ReconnectingFailed\|onDisconnected\|onConnected\|NetworkStateListener\|leaveTimer\|leave(" 2>/dev/null | head -100

Length of output: 319


🏁 Script executed:

#!/bin/bash
# Read Call.kt from stream-video-android-core only
cat stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt | grep -n "fun reconnect\|Mutex\|tryLock\|unlock\|launch {" | head -80

echo ""
echo "--- Full reconnect section ---"
# Extract lines around the reconnect function
awk '/private val reconnectMutex|fun reconnect\b|fun reconnectFast\b|fun reconnectRejoin\b|fun reconnectMigrate\b/{found=1} found{print NR": "$0} /^    \}$/{if(found && depth==0){found=0}}' \
  stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt | head -300

echo ""
echo "--- NetworkStateListener in Call.kt ---"
grep -n -A 30 "NetworkStateListener\|onConnected\|onDisconnected\|leaveTimer\|atomicLeave\|leave(" \
  stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt | head -150

Length of output: 8593


🏁 Script executed:

#!/bin/bash
# Read the actual reconnect methods and surrounding launch{} calls
sed -n '780,940p' stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt

echo ""
echo "=== reconnectFast ==="
sed -n '935,1020p' stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt

echo ""
echo "=== reconnectRejoin ==="
sed -n '1020,1080p' stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt

echo ""
echo "=== reconnectMigrate ==="
sed -n '1020,1120p' stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt

echo ""
echo "=== fire-and-forget launches ==="
sed -n '1130,1160p' stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt
sed -n '1245,1265p' stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt
sed -n '1760,1810p' stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt
sed -n '1920,1960p' stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt

Length of output: 22232


@rahul-lohra Here is the focused review for unhandled exceptions and coroutine misuse across the PR's new/changed files.


🔴 Critical

1. reconnect() swallows CancellationExceptionCall.kt Line ~895

try {
    when (currentStrategy) { ... reconnectFast(reason) ... }
    break
} catch (error: Exception) {   // ← CancellationException is a subclass of Exception
    delay(RECONNECT_DELAY_MS)
    attempt++
    ...
}

CancellationException is a subclass of Exception in Kotlin. If the coroutine is cancelled (e.g. the user calls leave()) while inside reconnectFast/Rejoin/Migrate, the cancellation signal is caught, the loop delays and escalates strategy, and keeps running. This is the most severe coroutine misuse in the PR — the reconnect loop becomes uncancellable.

Fix: Re-throw CancellationException before the generic catch, or catch Exception only after filtering:

} catch (cancel: CancellationException) {
    throw cancel
} catch (error: Exception) {
    ...
}

2. TOCTOU session.value!! force-unwrap in reconnectRejoinCall.kt Line ~987

val oldSession = this.session.value!! // ← force-unwrap

session is checked non-null earlier, but between that check and this dereference another coroutine (e.g. leave()) can set session.value = null, causing a NullPointerException. The same issue exists in reconnectMigrate() at the corresponding line.

Fix: Capture and null-check in a single step:

val oldSession = this.session.value
    ?: throw IllegalStateException("Session was cleared before rejoin could proceed")

🟠 Significant

3. Unhandled exceptions in fire-and-forget reconnect launches — RtcSession.kt

onSfuApiError = { error ->
    call.scope.launch {                     // ← no CoroutineExceptionHandler
        call.reconnect(strategy, "SfuApiError:...")
    }
},
onSfuNetworkFailure = { throwable ->
    call.scope.launch {                     // ← no CoroutineExceptionHandler
        call.reconnect(FAST, "SfuNetworkFailure:...")
    }
},

call.scope uses a SupervisorJob so a thrown exception won't cancel sibling coroutines, but it will hit the thread's uncaught exception handler (potentially crashing the app). Any exception escaping reconnect() before its own internal try block (e.g. if the early guard isDestroyed check throws) is fully unhandled.

Fix:

call.scope.launch(CoroutineExceptionHandler { _, t ->
    logger.e(t) { "Unhandled exception in reconnect coroutine" }
}) {
    call.reconnect(strategy, reason)
}

4. Throwing network listener crashes coroutine silently — NetworkStateProvider.kt

private fun Set<NetworkStateListener>.onConnected() {
    scope.launch {                    // ← no error handling
        forEach { it.onConnected() }  // ← a single throwing listener kills this Job
    }
}

If any subscriber's onConnected() or onDisconnected() throws, the launched coroutine dies silently — subsequent listeners in the set are not notified, and the connectivity transition is partially processed.

Fix:

scope.launch {
    forEach {
        try { it.onConnected() } catch (e: Exception) {
            logger.e(e) { "NetworkStateListener.onConnected threw" }
        }
    }
}

5. SfuSocket socket listener flow has no .catch{}SfuSocket.kt

socketListenerJob = listen().onEach {
    when (it) {
        is StreamWebSocketEvent.Error -> handleError(it)  // can throw
        is StreamWebSocketEvent.SfuMessage -> ...
        else -> handleEvent(UnknownEvent(it))
    }
}.launchIn(userScope)   // ← no .catch{} before launchIn

If handleError or handleEvent throw (e.g. during error parsing), the flow terminates and socketListenerJob completes without recovery. New socket events are silently dropped.

Fix: Add .catch {} before .launchIn:

socketListenerJob = listen().onEach { ... }
    .catch { e -> logger.e(e) { "Socket listener flow terminated unexpectedly" } }
    .launchIn(userScope)

🟡 Minor

6. leaveTimeoutAfterDisconnect launch has no error handler — Call.kt Line ~386

leaveTimeoutAfterDisconnect = scope.launch {
    delay(clientImpl.leaveAfterDisconnectSeconds * 1000)
    ...
    leave()   // if leave() throws, exception is unhandled
}

7. Debug class launches with no error handler — Call.kt Lines ~1928–1952

public fun rejoin(reason: String = "Debug") {
    call.scope.launch { call.rejoin(reason) }   // no handler
}
fun migrate() {
    call.scope.launch { call.migrate() }         // no handler
}
fun fastReconnect(reason: String = "Debug") {
    call.scope.launch { call.fastReconnect(reason) }   // no handler
}

These are debug helpers, but since call.scope uses SupervisorJob, uncaught exceptions still hit the global uncaught handler.

8. Opaque throw Exception(...) in reconnectRejoin/reconnectMigrateCall.kt

throw Exception("Failed to get join response: ${joinResponse.errorOrNull()}")

Wrapping in a plain Exception with a string message loses the typed error. catch (error: Exception) in the caller can't distinguish this programmatically. Consider a typed exception (e.g. ReconnectException) so the escalation logic can be more precise.

9. RetryableSignalingServiceDecorator.retryCallonNetworkFailure then rethrow — RetryableSignalingServiceDecorator.kt

onNetworkFailure(throwable)
throw throwable

If onNetworkFailure itself throws, the original throwable is swallowed. Wrap with try/catch:

try { onNetworkFailure(throwable) } catch (e: Exception) { logger.w(e) { ... } }
throw throwable

Summary table

# File Severity Issue
1 Call.kt ~L895 🔴 Critical catch (Exception) swallows CancellationException in reconnect loop
2 Call.kt ~L987, ~L1045 🔴 Critical session.value!! force-unwrap TOCTOU race in reconnectRejoin/reconnectMigrate
3 RtcSession.kt 🟠 Significant launch { reconnect() } with no CoroutineExceptionHandler
4 NetworkStateProvider.kt 🟠 Significant Listener loop in scope.launch{} with no error handler
5 SfuSocket.kt 🟠 Significant Flow .onEach{}.launchIn() missing .catch{}
6 Call.kt ~L386 🟡 Minor leaveTimeoutAfterDisconnect launch has no error handler
7 Call.kt ~L1928–52 🟡 Minor Debug launches with no error handler
8 Call.kt 🟡 Minor throw Exception(...) loses typed error info
9 RetryableSignalingServiceDecorator.kt 🟡 Minor onNetworkFailure throw can suppress original exception

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 15, 2026

Oops, something went wrong! Please try again later. 🐰 💔

@rahul-lohra
Copy link
Copy Markdown
Contributor

@coderabbitaireview in this PR. Can you review for the following

  1. The code should flow in uni-direction way
  2. Enclosing class should need keep the reference of outer class. You can provide suggestion to provide callbacks or flow

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 16, 2026

Tip

For best results, initiate chat on the files or code changes.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
echo "=== RtcSession constructor and Call references ==="
grep -n "val call\|var call\|call\.\|Call(" stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt | head -80

echo ""
echo "=== SfuConnectionModule constructor ==="
cat stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/internal/module/SfuConnectionModule.kt

echo ""
echo "=== NetworkStateListener - who implements it ==="
grep -rn "NetworkStateListener\|implements NetworkStateListener\|: NetworkStateListener" stream-video-android-core/src/main/kotlin/ | grep -v "Test" | head -30

echo ""
echo "=== Call.kt inner classes and Debug class ==="
grep -n "inner class\|class Debug\|class Network\|call\.\|this@Call\|val call" stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt | head -60

echo ""
echo "=== Who creates RtcSession and passes what ==="
grep -n "RtcSession(" stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt | head -20

echo ""
echo "=== SfuConnectionModule creation in RtcSession ==="
grep -n "SfuConnectionModule\|onSfuApiError\|onSfuNetworkFailure\|onSignalingLost" stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt | head -40

Length of output: 17347


🏁 Script executed:

#!/bin/bash
echo "=== RtcSession class header and constructor ==="
sed -n '1,100p' stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt

echo ""
echo "=== Call.kt - where it creates/uses RtcSession and NetworkStateListener ==="
grep -n "NetworkStateListener\|networkListener\|onConnected\|onDisconnected\|RtcSession\|this@Call\|inner class\|companion object" stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt | head -60

echo ""
echo "=== Publisher constructor - who is passed ==="
sed -n '1,60p' stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Publisher.kt

echo ""
echo "=== Subscriber constructor ==="
sed -n '1,60p' stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Subscriber.kt

Length of output: 13538


@rahul-lohra Here is the focused review for unidirectional data flow and enclosing-class back-references in this PR.


1. RtcSession holds a full Call reference — bidirectional dependency

File: RtcSession.kt Line ~219

internal class RtcSession(
    ...
    private val call: Call,     // ← entire Call object pulled in
    ...
)

RtcSession is a lower-level component that should only receive commands and report results upward. Instead, it uses call.* in 40+ places:

Usage type Examples
Upward method calls call.reconnect(...), call.rejoin(...), call.onSfuConnectionEstablished()
State mutation call.state._connection.value = RealtimeConnection.Connected (Line ~668, 680)
Resource access call.peerConnectionFactory, call.mediaManager, call.clientCapabilities
Scope borrowing call.scope.launch { call.reconnect(...) }

This creates a circular dependency: Call → RtcSession → Call. The direction of data and command flow is ambiguous — both objects own each other.

Suggested fix: Define a narrow upward-communication interface and pass only that to RtcSession:

// New interface — only what RtcSession needs to tell Call
internal interface RtcSessionDelegate {
    val connectionState: StateFlow<RealtimeConnection>
    val ownCapabilities: StateFlow<List<OwnCapability>>
    suspend fun onSfuConnected()
    suspend fun reconnect(strategy: WebsocketReconnectStrategy, reason: String)
    fun getOrCreateParticipant(sessionId: String): ParticipantState
    // ...
}

// Call implements it:
internal class Call : RtcSessionDelegate { ... }

// RtcSession receives only the interface:
internal class RtcSession(
    private val delegate: RtcSessionDelegate,   // ← narrow contract, not the full Call
    ...
)

State mutations like call.state._connection.value = ... should move entirely into Call, triggered by callbacks/events raised by RtcSession:

// RtcSession signals upward:
delegate.onSfuConnected()

// Call handles the state mutation in its own layer:
override suspend fun onSfuConnected() {
    state._connection.value = RealtimeConnection.Connected
}

2. SfuConnectionModule lambdas capture call — implicit back-reference

File: RtcSession.kt Lines ~254–269

private val sfuConnectionModuleProvider: () -> SfuConnectionModule = {
    SfuConnectionModule(
        ...
        onSfuApiError = { error ->
            if (call.state.connection.value is RealtimeConnection.Disconnected) return@SfuConnectionModule
            call.scope.launch {
                call.reconnect(strategy, "SfuApiError:...")  // ← call captured in lambda
            }
        },
        onSfuNetworkFailure = { throwable ->
            if (call.state.connection.value is RealtimeConnection.Disconnected) return@SfuConnectionModule
            call.scope.launch {
                call.reconnect(...)  // ← call captured in lambda
            }
        },
    )
}

SfuConnectionModule should be completely unaware of Call. The lambdas passed to it are closures over call, making the module indirectly depend on the top-level Call. This also means the module holds a strong reference to Call for its lifetime.

Suggested fix: Keep the lambda abstraction but resolve call only at the RtcSessiondelegate boundary:

onSfuApiError = { error ->
    if (delegate.connectionState.value is RealtimeConnection.Disconnected) return@SfuConnectionModule
    coroutineScope.launch {
        delegate.reconnect(strategy, "SfuApiError:...")  // ← delegate, not call
    }
},

3. Call.Debug is a public nested class holding val call: Call

File: Call.kt Line 1917

public class Debug(val call: Call) {       // ← val call is public
    fun rejoin(reason: String = "Debug") {
        call.scope.launch { call.rejoin(reason) }
    }
    fun migrate() {
        call.scope.launch { call.migrate() }
    }
    fun fastReconnect(reason: String = "Debug") {
        call.scope.launch { call.fastReconnect(reason) }
    }
    // ...
}
  • val call is public, so any consumer that holds a Debug instance can reach the entire Call object — including internal state, session, media tracks, etc.
  • The nested class is tightly coupled to the outer class through call.scope.launch {}, which is the outer class's lifecycle object.
  • If a Debug instance is retained anywhere (e.g. a test framework, a UI debug panel), it prevents garbage collection of Call.

Suggested fix: Replace the Call reference with typed callback parameters:

public class Debug(
    private val onRejoin: (reason: String) -> Unit,
    private val onMigrate: () -> Unit,
    private val onFastReconnect: (reason: String) -> Unit,
    private val onDisableSubscriber: () -> Unit,
    private val onEnableSubscriber: () -> Unit,
    private val onRestartSubscriberIce: () -> Unit,
    private val onRestartPublisherIce: () -> Unit,
    private val onSimulateSfuFull: () -> Unit,
) {
    fun rejoin(reason: String = "Debug") = onRejoin(reason)
    fun migrate() = onMigrate()
    fun fastReconnect(reason: String = "Debug") = onFastReconnect(reason)
}

And build it at the Call layer:

val debug = Debug(
    onRejoin = { reason -> scope.launch { rejoin(reason) } },
    onMigrate = { scope.launch { migrate() } },
    onFastReconnect = { reason -> scope.launch { fastReconnect(reason) } },
    // ...
)

This way Debug holds no knowledge of Call at all.


4. RtcSession directly mutates Call's internal state — ownership violation

File: RtcSession.kt Lines ~668, 680

call.state._connection.value = RealtimeConnection.Connected  // inside RtcSession
call.state._connection.value = RealtimeConnection.Reconnecting

_connection is a MutableStateFlow that belongs to Call. Mutating it from RtcSession means two separate classes share ownership of the same state. The state diagram becomes impossible to reason about — either class can change the connection state at any time.

Suggested fix: RtcSession should emit an event/callback; Call alone should translate that into a state update:

// In RtcSession — signal upward:
delegate.onSfuConnected()

// In Call — owns the mutation:
override suspend fun onSfuConnected() {
    state._connection.value = RealtimeConnection.Connected
}

Dependency direction summary

Current (problematic):

Call ──creates──▶ RtcSession
 ▲                    │
 └────call.*──────────┘   ← circular

Target (unidirectional):

Call ──creates──▶ RtcSession
 │                    │
 │ implements         │ calls via
 ▼                    ▼
RtcSessionDelegate (interface)

🧠 Learnings used
Learnt from: aleksandar-apostolov
Repo: GetStream/stream-video-android PR: 1587
File: stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt:795-798
Timestamp: 2025-12-19T14:10:16.577Z
Learning: In the `rejoin()` function in `stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt`, the design invariant is that `rejoin()` should only be called when there is an active session (`this.session != null`). Therefore, using `this.session!!` is safe and intentional.

Learnt from: CR
Repo: GetStream/stream-video-android PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-19T09:15:37.269Z
Learning: Applies to **/*.{kt,kts} : Ensure cleanup/teardown paths handle cancellation and failure (important for sockets, queues, retries)

Learnt from: rahul-lohra
Repo: GetStream/stream-video-android PR: 1614
File: stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallBusyHandler.kt:40-44
Timestamp: 2026-02-18T12:33:02.111Z
Learning: In the `CallBusyHandler` class in `stream-video-android-core`, fire-and-forget rejection calls (using `streamVideo.scope.launch` to call `reject(RejectReason.Busy)`) without explicit error handling are acceptable for the busy call rejection feature.

Learnt from: CR
Repo: GetStream/stream-video-android PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-19T09:15:37.269Z
Learning: Applies to **/*.{kt,kts} : Keep concurrency deterministic—use structured coroutines and avoid global scope

Learnt from: CR
Repo: GetStream/stream-video-android PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-19T09:15:37.269Z
Learning: Applies to **/*.{kt,kts} : Keep critical RTC paths off the main thread; prefer coroutines with structured scopes

@aleksandar-apostolov aleksandar-apostolov changed the title fix(core): Unify reconnect pipeline and harden network transition handling Unify reconnect pipeline and harden network transition handling Apr 17, 2026
* this to [finalizeMigration] because the event loop is still needed while
* the new session is being established.
*/
private fun cancelActiveWork(cancelEventJob: Boolean = true) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: the old prepareRejoin() also called sfuConnectionModule.socketConnection.disconnect() and closed publisher/subscriber peer connections. Those calls aren't in cancelActiveWork() — are they handled by every caller separately, or were they intentionally dropped?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The socket disconnect and publisher/subscriber close were not dropped — they're in cleanup(), which is called by each path that needs full teardown:

  • Rejoin: prepareRejoin()cancelActiveWork() leaveWithReason()cleanup() (full teardown)

  • Migrate: enterMigration() → cancelActiveWork(cancelEventJob=false), then finalizeMigration() → cleanup() (deferred teardown — keeps event loop alive until new session connects)

  • Fast reconnect: prepareReconnect() → cancelActiveWork() + socket disconnect only. Peer connections are intentionally kept open since fast reconnect reuses them on the same SFU.

cancelActiveWork() is intentionally limited to cancelling jobs and the serial processor. It's the common subset shared by all three paths. The socket/peer-connection teardown varies per strategy, so it stays in the caller-specific methods.

@sonarqubecloud
Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
57.0% Coverage on New Code (required ≥ 80%)

See analysis details on SonarQube Cloud

Comment thread stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt Outdated
Comment thread stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt Outdated
Comment thread stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt Outdated
Comment thread stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt Outdated
…igrate now throw ReconnectPreconditionException instead of IllegalStateException.

Catch block now has three layers in order:

1. CancellationException → re-throw (coroutine cancellation)
2. ReconnectPreconditionException → log + set ReconnectingFailed + break (terminal, no retry)
3. Exception → retry with escalation (transient failures)
…ectOutcome

Replace ReconnectPreconditionException and try-catch control flow in the
reconnect loop with a sealed class ReconnectOutcome (Success,
PreconditionNotMet, PeerConnectionStale, Failed). Each reconnect method
now returns an outcome instead of throwing, and the loop dispatches via
an exhaustive `when` — the compiler enforces that every case is handled.

Made-with: Cursor
…aths

Add SfuConnectionResult sealed class in RtcSession so fastReconnect and
connectAndAwait return typed outcomes instead of throwing. The public
connect() now delegates to connectAndAwait, eliminating duplicated
request-building, tracing, and socket-await logic. fastReconnect is
reduced to its unique pre/post work (peer-connection health check,
subscription restore). Call.kt maps SfuConnectionResult → ReconnectOutcome
without try-catch.

Made-with: Cursor
Comment thread stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt Outdated
this.session.value = newSession

return try {
val result = newSession.internalConnect(reconnectDetails, currentOptions)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the function signature I don't get any sign that it carries a hidden logic of throwing exception. The function signature is dishonest. This does not seems scalable.

I will try to look for a way to resolve this.

A quick way to resolve this would be changing the method name to something like

newSession.internalConnectWithException so the method caller will know he needs to look inside for exception

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changing the internalConnect method not to throw anything

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a try-finally here because finalizeMigration() must run even on cancellation. The internalConnect can throw a cancellationException, on which we should do the cleanUp of migration()

Comment thread stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt Outdated
Comment thread stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt Outdated
…nternalConnect in _join

- Rename leaveWithReason to sendLeaveEvent and make it a suspend fun
  that awaits the send instead of fire-and-forget via launch. This
  fixes the race where cleanup() cancelled the supervisor job before
  the leave message was sent.
- Move sendLeaveEvent call in Call.internalLeave inside the
  clientImpl.scope.launch block so it completes before cleanup().
- Reorder prepareRejoin: sendLeaveEvent before cancelActiveWork/cleanup.
- Deprecate RtcSession.connect() in favor of internalConnect() which
  returns SfuConnectionResult instead of throwing.
- Update Call._join to use internalConnect directly, replacing the
  try-catch block with an exhaustive when on SfuConnectionResult.
- Split SfuConnectionResult into two sealed classes: SfuConnectionResult
  (Connected/Failed) for internalConnect, FastReconnectResult
  (Connected/PeerConnectionStale/Failed) for fastReconnect.
- Remove redundant try-catch blocks around joinRequest and collectStats
  in reconnectFast/reconnectRejoin/reconnectMigrate.
- Replace withTimeout with withTimeoutOrNull in internalConnect to
  correctly distinguish timeouts from external cancellation.
- Remove unused CancellationException import from Call.kt.

Made-with: Cursor
…etwork availability

After a successful fast reconnect, the publisher and subscriber ICE
connections may be stale because the underlying network path changed
(e.g. WiFi ↔ cellular). This adds explicit ICE restarts for both
directions so fresh candidates are gathered and media resumes.

Also introduces a network availability check in the Call.reconnect()
loop to avoid burning limited FAST attempt budgets when the network
is down. Skipped attempts are not counted, preserving the full retry
budget for when connectivity returns.

Key changes:
- RtcSession.restartIceAfterFastReconnect() restarts publisher and
  subscriber ICE after a successful fast reconnect
- RtcSession.fastReconnect() now uses isClosed() instead of
  isFailedOrClosed() so FAILED peer connections (recoverable via ICE
  restart) are not prematurely escalated to REJOIN
- StreamPeerConnection.isClosed() distinguishes truly CLOSED (needs
  REJOIN) from FAILED (recoverable)
- Call.reconnect() skips FAST attempts when network is unavailable
- Call.collectStats() wrapped in runCatching to avoid crashes during
  reconnection
- Companion object constants documented with KDoc
- Tests added/updated for all new behavior

Made-with: Cursor
… reconnect loop bugs

Strip self-reconnection logic from SfuSocket so it becomes a passive
state reporter. All SFU socket state changes (WebSocketEventLost,
DisconnectedTemporarily, NetworkDisconnected) now route through
RtcSession.stateJob into Call.reconnect(), eliminating the race
condition that caused double-joins and SCHEDULED_CLEANUP peer drops.

Key changes:
- Remove SfuSocket's networkStateListener and self-reconnect paths
- Route WebSocketEventLost from RtcSession.stateJob to Call.reconnect()
- Guard prepareReconnect() against disconnecting an already-disconnected socket
- Fix strategy downgrade bug: REJOIN no longer falls back to FAST
- Increment reconnectAttempts only for REJOIN/MIGRATE (not FAST/UNSPECIFIED)
- Leave the call when ReconnectingFailed is reached (prevent zombie calls)
- Remove redundant inner-loop network guard (entry-point checks suffice)
- Deprecate RestartConnection, NetworkAvailable, and RestartReason for
  binary compatibility
- Add high-level reconnection logic documentation

Made-with: Cursor
…nto fix/cleanup-old-rtc-session-on-migration
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr:improvement Enhances an existing feature or code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants