Skip to content

Add StreamTelemetry capabilities to be reused across products#59

Open
aleksandar-apostolov wants to merge 7 commits intodevelopfrom
feature/telemetry
Open

Add StreamTelemetry capabilities to be reused across products#59
aleksandar-apostolov wants to merge 7 commits intodevelopfrom
feature/telemetry

Conversation

@aleksandar-apostolov
Copy link
Copy Markdown
Collaborator

@aleksandar-apostolov aleksandar-apostolov commented Apr 24, 2026

Goal

Introduce a general-purpose telemetry engine in core that product SDKs (Chat, Video, Feeds) can use to collect, buffer, and drain operational signals.

Implementation

API layer (api/telemetry/):

  • StreamSignal — data class: tag + data + timestamp
  • StreamTelemetryScope — named scope with emit() and drain()
  • StreamTelemetry — engine interface, factory function, and StreamTelemetryNoOp
  • StreamSignalRedactor — anonymization hook that runs on emit()
  • StreamTelemetryConfig — memory/disk capacity, configurable paths, SDK-versioned disk layout

Internal layer (internal/telemetry/):

  • StreamTelemetryScopeImpl — buffer-swap thread safety, memory-first with disk spill on overflow, drops oldest under pressure
  • StreamTelemetryImpl — scope factory with stale version cleanup on init

Integration:

  • Added telemetry: StreamTelemetry? = null to StreamComponentProvider
  • When not provided, core uses StreamTelemetryNoOp (zero cost)

Design highlights:

  • Product SDK creates telemetry, injects into core, keeps reference for draining
  • Core emits freely, product SDK decides when/where to send
  • Disk storage versioned by SDK version — stale versions nuked on init, no migration
  • emit() never throws or affects the caller

Testing

  • Unit tests for StreamTelemetryScopeImpl (emit, drain, spill, redaction, thread safety)
  • Unit tests for StreamTelemetryImpl (scope creation, version cleanup)
  • Integration with product SDK (Video/Chat) — emit + drain flow

pr: core

Summary by CodeRabbit

Release Notes

  • New Features
    • Added telemetry collection system with configurable disk and memory buffering for signal tracking
    • Telemetry can be fully disabled via no-op mode with minimal performance overhead
    • Support for signal redaction to exclude sensitive telemetry data
    • Automatic cleanup of stale telemetry data with versioning support

Introduce a general-purpose telemetry engine for collecting signals
across named scopes. Product SDKs inject it via StreamComponentProvider,
core emits internally, product drains on its own schedule.

- StreamSignal: tag + data + timestamp
- StreamTelemetryScope: emit() + drain() with buffer-swap thread safety
- StreamTelemetryScopeImpl: memory-first, disk spill on overflow, drop
  oldest under pressure
- StreamSignalRedactor: anonymization hook on emit()
- StreamTelemetryConfig: per-scope memory/disk caps, configurable paths,
  SDK-versioned disk layout with stale version cleanup
- StreamTelemetryNoOp: zero-cost default when telemetry not provided
- StreamComponentProvider: added telemetry field
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 24, 2026

PR checklist ✅

All required conditions are satisfied:

  • Title length is OK (or ignored by label).
  • At least one pr: label exists.
  • Sections ### Goal, ### Implementation, and ### Testing are filled.

🎉 Great job! This PR is ready for review.

@aleksandar-apostolov aleksandar-apostolov changed the title feat(telemetry): add StreamTelemetry library feat(telemetry): add StreamTelemetry capabilities Apr 24, 2026
@aleksandar-apostolov aleksandar-apostolov changed the title feat(telemetry): add StreamTelemetry capabilities Add StreamTelemetry capabilities to be reused across products Apr 24, 2026
…or disk ops

- StreamSignal → api/model/telemetry/ (data models package)
- StreamTelemetryConfig → api/model/config/ (alongside other configs)
- drain() is now suspend, disk reads on Dispatchers.IO
- spillToDisk launches on Dispatchers.IO via scope
- cleanStaleVersions runs on Dispatchers.IO at init
- StreamTelemetryImpl and StreamTelemetryScopeImpl accept CoroutineScope
- drain() now returns Result<List<StreamSignal>> so product SDK can
  distinguish between empty (no signals) and failure (disk I/O error)
- Use runCatchingCancellable to properly rethrow CancellationException
- drainDisk() no longer swallows exceptions — they propagate to Result
77 tests covering:
- Basic emit/drain, ordering, timestamps
- Drain semantics (clear, idempotent, interleaved cycles)
- Redactor (transform, drop fallback, exception safety)
- Memory capacity boundaries (at, below, above, 0, 1, large)
- Disk spill (FIFO order, multiple spills, accumulation)
- Disk capacity boundaries (0, 1 byte, exact fit, rotation)
- Thread safety (concurrent emit, concurrent emit+drain)
- Disk I/O failures (read-only dir, corrupt files, external deletion)
- Corrupt spill files (blank lines, mixed valid/invalid, binary garbage,
  single-field lines, very large files)
- Spill file lifecycle (created on overflow, deleted after drain)
- Serialization edge cases (tabs, unicode, empty strings, long data,
  newline limitation, escape sequence ambiguity)
- Version cleanup (stale dirs deleted, current kept, siblings safe)
- Scope creation (identity, isolation, special chars, empty name)
- Custom config (root override, basePath)
- NoOp (emit discards, drain returns empty success)

Fix: replace List.removeFirst() with removeAt(0) for JDK < 21 compat
- Add Mutex for disk I/O — serializes spillToDisk and drainDisk to
  prevent concurrent read/write on the same spill file
- emit() returns Result<Unit> via runCatching
- drain() already returns Result<List<StreamSignal>> via
  runCatchingCancellable
- spillToDisk uses runCatchingCancellable instead of try/catch
- Every public method now returns Result<*> for API consistency
5 new tests exercising the disk Mutex under concurrent access:
- drain while spill is in flight
- rapid spill cycles with memoryCapacity=1
- concurrent drain and emit with disk overflow + no-duplicate check
- multiple coroutines draining same scope concurrently
- spill and drain interleaved rapidly over 50 cycles
@aleksandar-apostolov aleksandar-apostolov marked this pull request as ready for review April 24, 2026 08:57
- cleanStaleVersions() is now a public suspend method on StreamTelemetry
- Product SDK calls it when ready, not auto-fired in init
- Removed hardcoded Dispatchers.IO from StreamTelemetryImpl
- Uses runCatchingCancellable, returns Result<Unit>
- Updated tests to call cleanStaleVersions() explicitly
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 24, 2026

Walkthrough

This PR introduces a comprehensive telemetry system to stream-android-core, enabling the capture, buffering, and draining of telemetry signals with configurable memory and disk storage, signal redaction, and a no-op fallback implementation.

Changes

Cohort / File(s) Summary
Configuration & Models
StreamComponentProvider.kt, StreamTelemetryConfig.kt, StreamSignal.kt
Added optional telemetry dependency to StreamComponentProvider and introduced StreamTelemetryConfig data class with configurable disk/memory capacities, versioning, and path management. New StreamSignal data class captures telemetry units with tag, data payload, and timestamp.
Telemetry Interfaces
StreamTelemetry.kt, StreamTelemetryScope.kt, StreamSignalRedactor.kt
Defined StreamTelemetry interface and factory function, StreamTelemetryScope interface for signal grouping with emit/drain capabilities, and StreamSignalRedactor functional interface for signal transformation. Includes StreamTelemetryNoOp singleton for no-op behavior.
Telemetry Implementation
StreamTelemetryImpl.kt, StreamTelemetryScopeImpl.kt
Implemented StreamTelemetryImpl with version-based directory cleanup and per-scope memoization. Implemented StreamTelemetryScopeImpl with synchronized in-memory buffering, asynchronous disk spilling when memory capacity exceeded, disk trimming under capacity constraints, and atomic drain operations preserving FIFO order across memory and disk tiers.
Test Coverage
StreamTelemetryImplTest.kt, StreamTelemetryScopeImplTest.kt
Added comprehensive test suites validating scope creation/reuse, initialization cleanup logic, custom configuration paths, signal emission/draining, redaction behavior, concurrent operations, memory/disk boundary conditions, FIFO ordering, disk persistence, and robustness under I/O failures and disk interference scenarios.

Sequence Diagram(s)

sequenceDiagram
    participant App as Application
    participant Tel as StreamTelemetry
    participant Scope as StreamTelemetryScope
    participant Mem as Memory Buffer
    participant Disk as Disk Storage

    App->>Tel: scope(name)
    activate Tel
    Tel->>Scope: create or retrieve
    deactivate Tel
    
    loop Emit Phase
        App->>Scope: emit(tag, data)
        activate Scope
        Scope->>Scope: apply redactor
        Scope->>Mem: add signal
        alt Memory at capacity
            Scope->>Disk: spill oldest signals (async)
        end
        Scope->>App: return Result<Unit>
        deactivate Scope
    end
    
    App->>Scope: drain()
    activate Scope
    Scope->>Disk: read and delete spill file
    Scope->>Mem: snapshot and clear buffer
    Scope->>Scope: merge disk + memory signals
    Scope->>App: return Result<List<StreamSignal>>
    deactivate Scope
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐰 Signals hop and skip with glee,
Through memory lanes and disk spree,
With scopes and redacts, none shall flee,
Telemetry blooms wild and free! ✨📊

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 30.19% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely describes the main feature addition of introducing a reusable telemetry engine for product SDKs.
Description check ✅ Passed The description covers Goal, Implementation, and mentions Testing, but the Testing section is incomplete with unchecked boxes and lacks concrete test implementation details.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/telemetry

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (2)
stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt (1)

337-352: Assertion is conditional on spillFile.exists() and trivially passes if no spill occurred.

If, for any reason (timing, impl change), the spill never lands on disk by the time waitForSpill() returns, the if (spillFile.exists()) branch is skipped and the test passes without exercising rotation at all. Since the test's purpose is disk rotation under capacity pressure, make the existence of the spill file a precondition:

🔧 Suggested tightening
         val spillFile = File(dir, "spill.bin")
-        if (spillFile.exists()) {
-            assertTrue(
-                "Spill file should be around disk capacity, was ${spillFile.length()}",
-                spillFile.length() <= 150,
-            )
-        }
+        assertTrue("Spill file must exist to test rotation", spillFile.exists())
+        assertTrue(
+            "Spill file should be around disk capacity, was ${spillFile.length()}",
+            spillFile.length() <= 150,
+        )

The same pattern appears in disk capacity of 1 byte trims aggressively (lines 836-842) and disk capacity exactly matches one signal line (lines 853-856) — consider applying the same change there.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt`
around lines 337 - 352, The test disk rotation assertion currently skips
verification if the spill file doesn't exist; change the test `disk rotation
drops oldest when capacity exceeded` to require the spill file's existence after
calling `waitForSpill()`—replace the conditional `if (spillFile.exists())` with
an explicit assertion that `spillFile.exists()` is true, then perform the length
assertion against `spillFile.length()`; do the same tightening for the other
tests mentioned (`disk capacity of 1 byte trims aggressively` and `disk capacity
exactly matches one signal line`) so they assert the spill file exists before
checking its size, and keep use of `createScope(...)`, `waitForSpill()`,
`memoryCapacity`, and `diskCapacity` unchanged.
stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt (1)

128-141: Consider replacing runBlocking + Thread.sleep with runTest and a controllable dispatcher.

The init cleanup runs on Dispatchers.Default via the injected scope, and tests wait a fixed 500ms. On slower CI agents this can be racy (flaky failures) or unnecessarily slow when cleanup finishes in 1 ms. As per coding guidelines (use MockK and coroutines-test to control timing), prefer injecting a StandardTestDispatcher/UnconfinedTestDispatcher and using runTest { … advanceUntilIdle() }, or expose a Job from the cleanup so tests can join() deterministically. This comment applies to every runBlocking { … Thread.sleep(500) } usage in both telemetry test files.

As per coding guidelines: "Use MockK and coroutines-test to control timing".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt`
around lines 128 - 141, The test uses runBlocking + Thread.sleep to wait for
async cleanup started by StreamTelemetryImpl, which is flaky; change the tests
to use kotlinx.coroutines.test runTest with a
StandardTestDispatcher/UnconfinedTestDispatcher and inject a
TestScope/TestDispatcher into StreamTelemetryImpl (or expose the cleanup Job) so
you can deterministically wait: construct StreamTelemetryImpl(context,
config(version = "1.0.0"), testScope) then call advanceUntilIdle() (or join the
exposed Job) instead of Thread.sleep; apply the same pattern to all telemetry
tests that use runBlocking + Thread.sleep.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt`:
- Around line 149-168: encode(...) currently calls signal.data?.toString() so
decode(...) always yields StreamSignal.data as String?, causing memory vs disk
round-trip type inconsistency; update the public API docs: add KDoc to
StreamSignal.data (and to emit(...) / drain(...) where relevant) explicitly
stating that spills to disk coerce data to String? and callers must
pre-serialize or expect String, and include a note next steps (optionally change
StreamSignal.data to String? or replace encode/decode with a proper serializer
such as Moshi to preserve types); ensure you reference StreamSignal.data,
encode, and decode in the KDoc so callers see the behavior.
- Around line 74-91: The emit method in StreamTelemetryScopeImpl currently uses
"val signal = redactor?.redact(raw) ?: raw" which accidentally re-inserts
unredacted `raw` when a non-null redactor intentionally returns null to drop a
signal; change this to call redact and if the result is null, exit without
buffering (i.e., do not add to buffer or trigger spilling). Specifically, in
emit(), replace the Elvis expression with a nullable result from
StreamSignalRedactor.redact(raw) and if that result is null, return/skip the
rest of the synchronized block so the signal is dropped rather than buffered.
- Around line 125-134: trimDiskIfNeeded currently removes one line and rewrites
the whole file repeatedly causing O(N²) disk I/O; change it to compute which
suffix of lines fits within diskCapacity once and then write that suffix in a
single file.writeText call. Specifically, in trimDiskIfNeeded: read all lines
into a list, iterate from the end accumulating the byte size of each line
(including the separator/newline) until the cumulative size <= diskCapacity,
then create the targetSlice as the tail subset that fits and call
file.writeText(targetSlice.joinToString("\n", postfix = "\n")) once (do not
re-check file.length() inside the loop). Ensure you reference the function name
trimDiskIfNeeded and use the same line-reading approach but only perform one
write.

In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt`:
- Around line 197-209: The test computes expectedDir but never verifies the
telemetry uses the custom root; update the test for StreamTelemetryImpl by
forcing a spill (e.g., emit enough events or call the telemetry scope's
flush/spill method after creating telemetryScope via
StreamTelemetryImpl.scope("test") and emitting an event) and then assert that
the actual spill path (the directory or file produced by that spill) is under
the computed expectedDir (File(customRoot, "stream/telemetry/2.0.0/test")), or
alternatively mirror the approach from the "custom basePath is used" test to
assert version cleanup/paths use config(root = customRoot) instead of cacheDir.

In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt`:
- Around line 250-257: Update the test `redactor exception does not crash emit`
to actually assert the emit result and downstream behavior: call
`sut.emit("event", null)` and assert the returned Result is successful (e.g.,
result.isSuccess), then call `sut.drain()` and assert the drained signal is the
raw/fallback signal (matching the behavior tested in the `redactor returning
null falls back to raw signal` test). Use the existing `StreamSignalRedactor`
that throws and the `createScope` helper to locate the code paths
(`StreamSignalRedactor`, `createScope`, `sut.emit`, `sut.drain`) and ensure the
test fails if emit silently did nothing.
- Around line 311-317: The current FIFO assertion is weak because
tags.indexOf("a") can be -1; update the test around sut.drain().getOrThrow() and
tags so you first assert required disk and memory tags exist (e.g., ensure
tags.contains("a") and tags.contains("d")), then strengthen the FIFO check by
computing all disk-tag indices and all memory-tag indices and asserting that the
maximum disk index is less than the minimum memory index (ensuring every disk
signal precedes every memory signal) instead of only comparing tags.indexOf("a")
and tags.indexOf("d").
- Around line 586-597: The two tests 'emit does not throw when spill directory
is read-only' and 'emit survives when spill dir is read-only' are flaky because
File.setReadOnly() can be ignored by root; replace the unreliable permission
approach with a deterministic write barrier: create a real file and pass
File(existingFile, "nested") as the spill dir so mkdirs() will always fail, or
add an early guard using Assume.assumeFalse(isRunningAsRoot()) to skip the test
under root; update the test setup in the methods that call
createScope(memoryCapacity = ..., dir = ...) to use the existing-file-parent
pattern (or the Assume guard) so the spill path is exercised consistently.

---

Nitpick comments:
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt`:
- Around line 128-141: The test uses runBlocking + Thread.sleep to wait for
async cleanup started by StreamTelemetryImpl, which is flaky; change the tests
to use kotlinx.coroutines.test runTest with a
StandardTestDispatcher/UnconfinedTestDispatcher and inject a
TestScope/TestDispatcher into StreamTelemetryImpl (or expose the cleanup Job) so
you can deterministically wait: construct StreamTelemetryImpl(context,
config(version = "1.0.0"), testScope) then call advanceUntilIdle() (or join the
exposed Job) instead of Thread.sleep; apply the same pattern to all telemetry
tests that use runBlocking + Thread.sleep.

In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt`:
- Around line 337-352: The test disk rotation assertion currently skips
verification if the spill file doesn't exist; change the test `disk rotation
drops oldest when capacity exceeded` to require the spill file's existence after
calling `waitForSpill()`—replace the conditional `if (spillFile.exists())` with
an explicit assertion that `spillFile.exists()` is true, then perform the length
assertion against `spillFile.length()`; do the same tightening for the other
tests mentioned (`disk capacity of 1 byte trims aggressively` and `disk capacity
exactly matches one signal line`) so they assert the spill file exists before
checking its size, and keep use of `createScope(...)`, `waitForSpill()`,
`memoryCapacity`, and `diskCapacity` unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 8fd9783f-e9a4-42fb-a690-65c38324f54e

📥 Commits

Reviewing files that changed from the base of the PR and between 332de07 and 92cccce.

📒 Files selected for processing (10)
  • stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt
  • stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamTelemetryConfig.kt
  • stream-android-core/src/main/java/io/getstream/android/core/api/model/telemetry/StreamSignal.kt
  • stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignalRedactor.kt
  • stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt
  • stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt
  • stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImpl.kt
  • stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt
  • stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt
  • stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt

Comment on lines +74 to +91
override fun emit(tag: String, data: Any?): Result<Unit> =
// runCatching (not cancellable) — emit is not a suspend function.
runCatching {
val raw = StreamSignal(tag = tag, data = data, timestamp = System.currentTimeMillis())
val signal = redactor?.redact(raw) ?: raw
synchronized(lock) {
buffer.add(signal)
if (buffer.size > memoryCapacity) {
if (spilling.compareAndSet(false, true)) {
val snapshot = buffer
buffer = mutableListOf()
scope.launch(Dispatchers.IO) { spillToDisk(snapshot) }
} else {
buffer.removeAt(0)
}
}
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Redactor null result is not honored — signal leaks through unredacted.

Per the StreamSignalRedactor KDoc, returning null from redact(...) should drop the signal entirely. The current expression uses Elvis as a fallback when the redactor itself is null, but as written it also collapses the intentional "drop" case into the unredacted raw signal:

redactor = someRedactor       // configured
someRedactor.redact(raw)      // returns null to drop
result                        // ends up being raw, not dropped

This both (a) violates the documented contract and (b) is a potential privacy regression — a redactor explicitly asking to drop a signal actually buffers the unredacted payload.

🛠️ Proposed fix
         runCatching {
             val raw = StreamSignal(tag = tag, data = data, timestamp = System.currentTimeMillis())
-            val signal = redactor?.redact(raw) ?: raw
+            val signal =
+                if (redactor == null) raw
+                else redactor.redact(raw) ?: return@runCatching // null means "drop"
             synchronized(lock) {
                 buffer.add(signal)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt`
around lines 74 - 91, The emit method in StreamTelemetryScopeImpl currently uses
"val signal = redactor?.redact(raw) ?: raw" which accidentally re-inserts
unredacted `raw` when a non-null redactor intentionally returns null to drop a
signal; change this to call redact and if the result is null, exit without
buffering (i.e., do not add to buffer or trigger spilling). Specifically, in
emit(), replace the Elvis expression with a nullable result from
StreamSignalRedactor.redact(raw) and if that result is null, return/skip the
rest of the synchronized block so the signal is dropped rather than buffered.

Comment on lines +125 to +134
private fun trimDiskIfNeeded(file: File) {
if (!file.exists() || file.length() <= diskCapacity) {
return
}
val lines = file.readLines().toMutableList()
while (lines.isNotEmpty() && file.length() > diskCapacity) {
lines.removeAt(0)
file.writeText(lines.joinToString(separator = "\n", postfix = "\n"))
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

trimDiskIfNeeded is O(N²) in disk I/O — rewrites entire file per dropped line.

On each iteration the loop removes one line from the in-memory list and then rewrites the full file via file.writeText(...). Under disk pressure (many signals to drop at once) this causes N rewrites of an up-to-diskCapacity-bytes file. The file.length() comparison inside the loop also re-stats after every rewrite.

Compute the target slice once (based on cumulative byte size from the tail) and do a single write. E.g.:

♻️ Proposed refactor
-    private fun trimDiskIfNeeded(file: File) {
-        if (!file.exists() || file.length() <= diskCapacity) {
-            return
-        }
-        val lines = file.readLines().toMutableList()
-        while (lines.isNotEmpty() && file.length() > diskCapacity) {
-            lines.removeAt(0)
-            file.writeText(lines.joinToString(separator = "\n", postfix = "\n"))
-        }
-    }
+    private fun trimDiskIfNeeded(file: File) {
+        if (!file.exists() || file.length() <= diskCapacity) return
+        val lines = file.readLines()
+        // Keep the newest suffix that fits within diskCapacity (accounting for "\n" per line).
+        var bytes = 0L
+        var startIdx = lines.size
+        for (i in lines.indices.reversed()) {
+            val next = bytes + lines[i].toByteArray(Charsets.UTF_8).size + 1
+            if (next > diskCapacity) break
+            bytes = next
+            startIdx = i
+        }
+        val kept = lines.subList(startIdx, lines.size)
+        file.writeText(if (kept.isEmpty()) "" else kept.joinToString("\n", postfix = "\n"))
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt`
around lines 125 - 134, trimDiskIfNeeded currently removes one line and rewrites
the whole file repeatedly causing O(N²) disk I/O; change it to compute which
suffix of lines fits within diskCapacity once and then write that suffix in a
single file.writeText call. Specifically, in trimDiskIfNeeded: read all lines
into a list, iterate from the end accumulating the byte size of each line
(including the separator/newline) until the cumulative size <= diskCapacity,
then create the targetSlice as the tail subset that fits and call
file.writeText(targetSlice.joinToString("\n", postfix = "\n")) once (do not
re-check file.length() inside the loop). Ensure you reference the function name
trimDiskIfNeeded and use the same line-reading approach but only perform one
write.

Comment on lines +149 to +168
private fun encode(signal: StreamSignal): String {
val escapedTag = signal.tag.replace(DELIMITER, DELIMITER_ESCAPE)
val escapedData = signal.data?.toString()?.replace(DELIMITER, DELIMITER_ESCAPE).orEmpty()
return "${signal.timestamp}$DELIMITER$escapedTag$DELIMITER$escapedData"
}

@Suppress("ReturnCount")
private fun decode(line: String): StreamSignal? {
if (line.isBlank()) {
return null
}
val parts = line.split(DELIMITER, limit = 3)
if (parts.size < 2) {
return null
}
val timestamp = parts[0].toLongOrNull() ?: return null
val tag = parts[1].replace(DELIMITER_ESCAPE, DELIMITER)
val data = parts.getOrNull(2)?.replace(DELIMITER_ESCAPE, DELIMITER)?.ifEmpty { null }
return StreamSignal(tag = tag, data = data, timestamp = timestamp)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Disk round-trip silently coerces non-String payloads to String.

signal.data?.toString() flattens any object payload to its string representation on spill. decode(...) then produces StreamSignal.data of type String?. This yields an observable behavior difference between the same signal drained from memory (original type preserved) vs. drained after spill (now a String). Consumers comparing is/as or deserializing the payload downstream will see inconsistent types depending on buffer pressure — an easy source of flaky production bugs.

Options, in order of preference:

  1. Document the contract explicitly: "StreamSignal.data is a String? after any drain() that went through disk" and advise emitters to pre-serialize.
  2. Constrain StreamSignal.data to String? at the API layer so all drains are consistent.
  3. Use a proper serializer (e.g. Moshi @JsonClass(generateAdapter = true)) for both tiers so memory vs. disk are indistinguishable.

At minimum, this deserves a KDoc note on StreamSignal.data and/or emit(...).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt`
around lines 149 - 168, encode(...) currently calls signal.data?.toString() so
decode(...) always yields StreamSignal.data as String?, causing memory vs disk
round-trip type inconsistency; update the public API docs: add KDoc to
StreamSignal.data (and to emit(...) / drain(...) where relevant) explicitly
stating that spills to disk coerce data to String? and callers must
pre-serialize or expect String, and include a note next steps (optionally change
StreamSignal.data to String? or replace encode/decode with a proper serializer
such as Moshi to preserve types); ensure you reference StreamSignal.data,
encode, and decode in the KDoc so callers see the behavior.

Comment on lines +197 to +209
@Test
fun `custom root overrides cacheDir`() = runBlocking {
val customRoot = tempDir.newFolder("custom-root")
val sut = StreamTelemetryImpl(context, config(version = "2.0.0", root = customRoot), scope)

val telemetryScope = sut.scope("test")
telemetryScope.emit("event", null)

// Spill dir should be under custom root
val expectedDir = File(customRoot, "stream/telemetry/2.0.0/test")
// Even if no spill happened, the scope should use this path
assertEquals("test", telemetryScope.name)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Test does not verify that the custom root is actually honored.

expectedDir is computed but never asserted on, and the only assertEquals checks telemetryScope.name == "test" — which is true for any scope regardless of root. As written, this test would still pass if config.root were silently ignored. Either trigger a spill and assert the spill file lands under expectedDir (and not under cacheDir), or assert that version cleanup uses customRoot (mirroring the custom basePath is used test).

🔧 Suggested tightening
     `@Test`
     fun `custom root overrides cacheDir`() = runBlocking {
         val customRoot = tempDir.newFolder("custom-root")
-        val sut = StreamTelemetryImpl(context, config(version = "2.0.0", root = customRoot), scope)
-
-        val telemetryScope = sut.scope("test")
-        telemetryScope.emit("event", null)
-
-        // Spill dir should be under custom root
-        val expectedDir = File(customRoot, "stream/telemetry/2.0.0/test")
-        // Even if no spill happened, the scope should use this path
-        assertEquals("test", telemetryScope.name)
+        // Seed a stale version under the custom root to verify cleanup targets it
+        // (and not cacheDir).
+        val staleUnderCustomRoot = File(customRoot, "stream/telemetry/0.9.0")
+        staleUnderCustomRoot.mkdirs()
+        val cacheDirTelemetry = File(cacheDir, "stream/telemetry/0.9.0")
+        cacheDirTelemetry.mkdirs()
+
+        StreamTelemetryImpl(context, config(version = "2.0.0", root = customRoot), scope)
+        Thread.sleep(500)
+
+        assertFalse("Stale version under custom root should be deleted", staleUnderCustomRoot.exists())
+        assertTrue("cacheDir should not be touched when custom root is set", cacheDirTelemetry.exists())
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt`
around lines 197 - 209, The test computes expectedDir but never verifies the
telemetry uses the custom root; update the test for StreamTelemetryImpl by
forcing a spill (e.g., emit enough events or call the telemetry scope's
flush/spill method after creating telemetryScope via
StreamTelemetryImpl.scope("test") and emitting an event) and then assert that
the actual spill path (the directory or file produced by that spill) is under
the computed expectedDir (File(customRoot, "stream/telemetry/2.0.0/test")), or
alternatively mirror the approach from the "custom basePath is used" test to
assert version cleanup/paths use config(root = customRoot) instead of cacheDir.

Comment on lines +250 to +257
@Test
fun `redactor exception does not crash emit`() {
val redactor = StreamSignalRedactor { throw RuntimeException("redactor boom") }
val sut = createScope(redactor = redactor)

sut.emit("event", null)
// No exception thrown
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Test has no assertion — a silently-failed emit would not be caught.

Since emit returns Result<Unit> (it never throws), the only thing this test currently verifies is that the method doesn't throw synchronously — which the API already guarantees. To actually exercise the redactor failure path, assert that emit returned success and that a subsequent drain still works (and ideally yields the raw signal as a fallback, matching the redactor returning null falls back to raw signal test).

🔧 Suggested tightening
     `@Test`
-    fun `redactor exception does not crash emit`() {
+    fun `redactor exception does not crash emit`() = runBlocking {
         val redactor = StreamSignalRedactor { throw RuntimeException("redactor boom") }
         val sut = createScope(redactor = redactor)
 
-        sut.emit("event", null)
-        // No exception thrown
+        val emitResult = sut.emit("event", null)
+        assertTrue("emit should not surface redactor failure", emitResult.isSuccess)
+
+        // drain should still succeed; signal either falls back to raw or is dropped,
+        // depending on the documented contract.
+        val drainResult = sut.drain()
+        assertTrue(drainResult.isSuccess)
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt`
around lines 250 - 257, Update the test `redactor exception does not crash emit`
to actually assert the emit result and downstream behavior: call
`sut.emit("event", null)` and assert the returned Result is successful (e.g.,
result.isSuccess), then call `sut.drain()` and assert the drained signal is the
raw/fallback signal (matching the behavior tested in the `redactor returning
null falls back to raw signal` test). Use the existing `StreamSignalRedactor`
that throws and the `createScope` helper to locate the code paths
(`StreamSignalRedactor`, `createScope`, `sut.emit`, `sut.drain`) and ensure the
test fails if emit silently did nothing.

Comment on lines +311 to +317
val signals = sut.drain().getOrThrow()
val tags = signals.map { it.tag }
assertTrue(
"Expected disk signals before memory, got: $tags",
tags.indexOf("a") < tags.indexOf("d"),
)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

FIFO assertion is too weak and can pass even when "a" was dropped.

tags.indexOf("a") returns -1 if "a" isn't present, so -1 < tags.indexOf("d") evaluates to true whenever "d" is present — the test would silently pass even if the disk-before-memory invariant is broken and the oldest signal was lost. It also only checks a weak a before d relation, not that every disk signal precedes every memory signal as the test name claims.

🔧 Suggested stronger assertion
-        val signals = sut.drain().getOrThrow()
-        val tags = signals.map { it.tag }
-        assertTrue(
-            "Expected disk signals before memory, got: $tags",
-            tags.indexOf("a") < tags.indexOf("d"),
-        )
+        val tags = sut.drain().getOrThrow().map { it.tag }
+        assertEquals(listOf("a", "b", "c", "d"), tags)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val signals = sut.drain().getOrThrow()
val tags = signals.map { it.tag }
assertTrue(
"Expected disk signals before memory, got: $tags",
tags.indexOf("a") < tags.indexOf("d"),
)
}
val tags = sut.drain().getOrThrow().map { it.tag }
assertEquals(listOf("a", "b", "c", "d"), tags)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt`
around lines 311 - 317, The current FIFO assertion is weak because
tags.indexOf("a") can be -1; update the test around sut.drain().getOrThrow() and
tags so you first assert required disk and memory tags exist (e.g., ensure
tags.contains("a") and tags.contains("d")), then strengthen the FIFO check by
computing all disk-tag indices and all memory-tag indices and asserting that the
maximum disk index is less than the minimum memory index (ensuring every disk
signal precedes every memory signal) instead of only comparing tags.indexOf("a")
and tags.indexOf("d").

Comment on lines +586 to +597
fun `emit does not throw when spill directory is read-only`() = runBlocking {
val readOnlyDir = tempDir.newFolder("readonly")
readOnlyDir.setReadOnly()

val sut = createScope(memoryCapacity = 2, dir = File(readOnlyDir, "nested"))

repeat(5) { sut.emit("event-$it", null) }
waitForSpill()

val result = sut.drain()
assertTrue("drain should succeed even after spill failure", result.isSuccess)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

Does java.io.File.setReadOnly() on a directory prevent creation of new files inside it on Linux when the process runs as root?

💡 Result:

No, java.io.File.setReadOnly on a directory does not prevent creation of new files inside it on Linux when the process runs as root.

Citations:


🏁 Script executed:

# Find and examine the test file
find . -name "StreamTelemetryScopeImplTest.kt" -type f

Repository: GetStream/stream-core-android

Length of output: 186


🏁 Script executed:

# Once found, examine the test code at the specified lines
head -n 620 ./stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt | tail -n 40

Repository: GetStream/stream-core-android

Length of output: 1372


🏁 Script executed:

# Also check the second test mentioned (lines 897-917)
sed -n '885,930p' ./stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt

Repository: GetStream/stream-core-android

Length of output: 1493


🏁 Script executed:

# Find the StreamTelemetryScopeImpl implementation
find . -name "StreamTelemetryScopeImpl.kt" -type f

Repository: GetStream/stream-core-android

Length of output: 182


🏁 Script executed:

# Check the implementation around spill directory creation
grep -n "spillDir\|mkdirs\|mkdir" ./stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt | head -20

Repository: GetStream/stream-core-android

Length of output: 197


🏁 Script executed:

# Look at how emit handles spill failures
sed -n '1,100p' ./stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt

Repository: GetStream/stream-core-android

Length of output: 4013


🏁 Script executed:

# Get the spillToDisk method to see how it handles failures
sed -n '100,200p' ./stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt

Repository: GetStream/stream-core-android

Length of output: 2790


🏁 Script executed:

# Get the drainDisk method
sed -n '200,300p' ./stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt

Repository: GetStream/stream-core-android

Length of output: 55


🏁 Script executed:

# Check if there are environment/permission checks elsewhere in the test
grep -n "root\|setReadOnly\|permission\|Assume" ./stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt

Repository: GetStream/stream-core-android

Length of output: 335


Both tests using setReadOnly() are unreliable when running as root—they may pass without exercising the intended failure path.

File.setReadOnly() does not prevent directory/file creation on Linux when the process runs as root; it only clears the write bit for the owner, which root ignores. Since spillDir.mkdirs() catches all exceptions and the implementation accepts spill failures gracefully (resetting the spilling flag and returning success from drain() regardless), these tests will silently succeed with a successful spill rather than validating resilience to permission failures.

Use a deterministic write barrier instead: make the parent directory a regular file (e.g., File(existingFile, "nested") where existingFile is a real file), so mkdirs() must fail regardless of uid. Alternatively, guard with Assume.assumeFalse(isRunningAsRoot).

Applies to:

  • Lines 586-597: emit does not throw when spill directory is read-only
  • Lines 897-917: emit survives when spill dir is read-only
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt`
around lines 586 - 597, The two tests 'emit does not throw when spill directory
is read-only' and 'emit survives when spill dir is read-only' are flaky because
File.setReadOnly() can be ignored by root; replace the unreliable permission
approach with a deterministic write barrier: create a real file and pass
File(existingFile, "nested") as the spill dir so mkdirs() will always fail, or
add an early guard using Assume.assumeFalse(isRunningAsRoot()) to skip the test
under root; update the test setup in the methods that call
createScope(memoryCapacity = ..., dir = ...) to use the existing-file-parent
pattern (or the Assume guard) so the spill path is exercised consistently.

@sonarqubecloud
Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
B Reliability Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr:new-feature New feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant