Add StreamTelemetry capabilities to be reused across products#59
Add StreamTelemetry capabilities to be reused across products#59aleksandar-apostolov wants to merge 7 commits intodevelopfrom
StreamTelemetry capabilities to be reused across products#59Conversation
Introduce a general-purpose telemetry engine for collecting signals across named scopes. Product SDKs inject it via StreamComponentProvider, core emits internally, product drains on its own schedule. - StreamSignal: tag + data + timestamp - StreamTelemetryScope: emit() + drain() with buffer-swap thread safety - StreamTelemetryScopeImpl: memory-first, disk spill on overflow, drop oldest under pressure - StreamSignalRedactor: anonymization hook on emit() - StreamTelemetryConfig: per-scope memory/disk caps, configurable paths, SDK-versioned disk layout with stale version cleanup - StreamTelemetryNoOp: zero-cost default when telemetry not provided - StreamComponentProvider: added telemetry field
PR checklist ✅All required conditions are satisfied:
🎉 Great job! This PR is ready for review. |
StreamTelemetry capabilities to be reused across products
…or disk ops - StreamSignal → api/model/telemetry/ (data models package) - StreamTelemetryConfig → api/model/config/ (alongside other configs) - drain() is now suspend, disk reads on Dispatchers.IO - spillToDisk launches on Dispatchers.IO via scope - cleanStaleVersions runs on Dispatchers.IO at init - StreamTelemetryImpl and StreamTelemetryScopeImpl accept CoroutineScope
- drain() now returns Result<List<StreamSignal>> so product SDK can distinguish between empty (no signals) and failure (disk I/O error) - Use runCatchingCancellable to properly rethrow CancellationException - drainDisk() no longer swallows exceptions — they propagate to Result
77 tests covering: - Basic emit/drain, ordering, timestamps - Drain semantics (clear, idempotent, interleaved cycles) - Redactor (transform, drop fallback, exception safety) - Memory capacity boundaries (at, below, above, 0, 1, large) - Disk spill (FIFO order, multiple spills, accumulation) - Disk capacity boundaries (0, 1 byte, exact fit, rotation) - Thread safety (concurrent emit, concurrent emit+drain) - Disk I/O failures (read-only dir, corrupt files, external deletion) - Corrupt spill files (blank lines, mixed valid/invalid, binary garbage, single-field lines, very large files) - Spill file lifecycle (created on overflow, deleted after drain) - Serialization edge cases (tabs, unicode, empty strings, long data, newline limitation, escape sequence ambiguity) - Version cleanup (stale dirs deleted, current kept, siblings safe) - Scope creation (identity, isolation, special chars, empty name) - Custom config (root override, basePath) - NoOp (emit discards, drain returns empty success) Fix: replace List.removeFirst() with removeAt(0) for JDK < 21 compat
- Add Mutex for disk I/O — serializes spillToDisk and drainDisk to prevent concurrent read/write on the same spill file - emit() returns Result<Unit> via runCatching - drain() already returns Result<List<StreamSignal>> via runCatchingCancellable - spillToDisk uses runCatchingCancellable instead of try/catch - Every public method now returns Result<*> for API consistency
5 new tests exercising the disk Mutex under concurrent access: - drain while spill is in flight - rapid spill cycles with memoryCapacity=1 - concurrent drain and emit with disk overflow + no-duplicate check - multiple coroutines draining same scope concurrently - spill and drain interleaved rapidly over 50 cycles
- cleanStaleVersions() is now a public suspend method on StreamTelemetry - Product SDK calls it when ready, not auto-fired in init - Removed hardcoded Dispatchers.IO from StreamTelemetryImpl - Uses runCatchingCancellable, returns Result<Unit> - Updated tests to call cleanStaleVersions() explicitly
WalkthroughThis PR introduces a comprehensive telemetry system to stream-android-core, enabling the capture, buffering, and draining of telemetry signals with configurable memory and disk storage, signal redaction, and a no-op fallback implementation. Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant Tel as StreamTelemetry
participant Scope as StreamTelemetryScope
participant Mem as Memory Buffer
participant Disk as Disk Storage
App->>Tel: scope(name)
activate Tel
Tel->>Scope: create or retrieve
deactivate Tel
loop Emit Phase
App->>Scope: emit(tag, data)
activate Scope
Scope->>Scope: apply redactor
Scope->>Mem: add signal
alt Memory at capacity
Scope->>Disk: spill oldest signals (async)
end
Scope->>App: return Result<Unit>
deactivate Scope
end
App->>Scope: drain()
activate Scope
Scope->>Disk: read and delete spill file
Scope->>Mem: snapshot and clear buffer
Scope->>Scope: merge disk + memory signals
Scope->>App: return Result<List<StreamSignal>>
deactivate Scope
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (2)
stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt (1)
337-352: Assertion is conditional onspillFile.exists()and trivially passes if no spill occurred.If, for any reason (timing, impl change), the spill never lands on disk by the time
waitForSpill()returns, theif (spillFile.exists())branch is skipped and the test passes without exercising rotation at all. Since the test's purpose is disk rotation under capacity pressure, make the existence of the spill file a precondition:🔧 Suggested tightening
val spillFile = File(dir, "spill.bin") - if (spillFile.exists()) { - assertTrue( - "Spill file should be around disk capacity, was ${spillFile.length()}", - spillFile.length() <= 150, - ) - } + assertTrue("Spill file must exist to test rotation", spillFile.exists()) + assertTrue( + "Spill file should be around disk capacity, was ${spillFile.length()}", + spillFile.length() <= 150, + )The same pattern appears in
disk capacity of 1 byte trims aggressively(lines 836-842) anddisk capacity exactly matches one signal line(lines 853-856) — consider applying the same change there.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt` around lines 337 - 352, The test disk rotation assertion currently skips verification if the spill file doesn't exist; change the test `disk rotation drops oldest when capacity exceeded` to require the spill file's existence after calling `waitForSpill()`—replace the conditional `if (spillFile.exists())` with an explicit assertion that `spillFile.exists()` is true, then perform the length assertion against `spillFile.length()`; do the same tightening for the other tests mentioned (`disk capacity of 1 byte trims aggressively` and `disk capacity exactly matches one signal line`) so they assert the spill file exists before checking its size, and keep use of `createScope(...)`, `waitForSpill()`, `memoryCapacity`, and `diskCapacity` unchanged.stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt (1)
128-141: Consider replacingrunBlocking+Thread.sleepwithrunTestand a controllable dispatcher.The init cleanup runs on
Dispatchers.Defaultvia the injectedscope, and tests wait a fixed 500ms. On slower CI agents this can be racy (flaky failures) or unnecessarily slow when cleanup finishes in 1 ms. As per coding guidelines (use MockK and coroutines-test to control timing), prefer injecting aStandardTestDispatcher/UnconfinedTestDispatcherand usingrunTest { … advanceUntilIdle() }, or expose aJobfrom the cleanup so tests canjoin()deterministically. This comment applies to everyrunBlocking { … Thread.sleep(500) }usage in both telemetry test files.As per coding guidelines: "Use MockK and coroutines-test to control timing".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt` around lines 128 - 141, The test uses runBlocking + Thread.sleep to wait for async cleanup started by StreamTelemetryImpl, which is flaky; change the tests to use kotlinx.coroutines.test runTest with a StandardTestDispatcher/UnconfinedTestDispatcher and inject a TestScope/TestDispatcher into StreamTelemetryImpl (or expose the cleanup Job) so you can deterministically wait: construct StreamTelemetryImpl(context, config(version = "1.0.0"), testScope) then call advanceUntilIdle() (or join the exposed Job) instead of Thread.sleep; apply the same pattern to all telemetry tests that use runBlocking + Thread.sleep.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt`:
- Around line 149-168: encode(...) currently calls signal.data?.toString() so
decode(...) always yields StreamSignal.data as String?, causing memory vs disk
round-trip type inconsistency; update the public API docs: add KDoc to
StreamSignal.data (and to emit(...) / drain(...) where relevant) explicitly
stating that spills to disk coerce data to String? and callers must
pre-serialize or expect String, and include a note next steps (optionally change
StreamSignal.data to String? or replace encode/decode with a proper serializer
such as Moshi to preserve types); ensure you reference StreamSignal.data,
encode, and decode in the KDoc so callers see the behavior.
- Around line 74-91: The emit method in StreamTelemetryScopeImpl currently uses
"val signal = redactor?.redact(raw) ?: raw" which accidentally re-inserts
unredacted `raw` when a non-null redactor intentionally returns null to drop a
signal; change this to call redact and if the result is null, exit without
buffering (i.e., do not add to buffer or trigger spilling). Specifically, in
emit(), replace the Elvis expression with a nullable result from
StreamSignalRedactor.redact(raw) and if that result is null, return/skip the
rest of the synchronized block so the signal is dropped rather than buffered.
- Around line 125-134: trimDiskIfNeeded currently removes one line and rewrites
the whole file repeatedly causing O(N²) disk I/O; change it to compute which
suffix of lines fits within diskCapacity once and then write that suffix in a
single file.writeText call. Specifically, in trimDiskIfNeeded: read all lines
into a list, iterate from the end accumulating the byte size of each line
(including the separator/newline) until the cumulative size <= diskCapacity,
then create the targetSlice as the tail subset that fits and call
file.writeText(targetSlice.joinToString("\n", postfix = "\n")) once (do not
re-check file.length() inside the loop). Ensure you reference the function name
trimDiskIfNeeded and use the same line-reading approach but only perform one
write.
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt`:
- Around line 197-209: The test computes expectedDir but never verifies the
telemetry uses the custom root; update the test for StreamTelemetryImpl by
forcing a spill (e.g., emit enough events or call the telemetry scope's
flush/spill method after creating telemetryScope via
StreamTelemetryImpl.scope("test") and emitting an event) and then assert that
the actual spill path (the directory or file produced by that spill) is under
the computed expectedDir (File(customRoot, "stream/telemetry/2.0.0/test")), or
alternatively mirror the approach from the "custom basePath is used" test to
assert version cleanup/paths use config(root = customRoot) instead of cacheDir.
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt`:
- Around line 250-257: Update the test `redactor exception does not crash emit`
to actually assert the emit result and downstream behavior: call
`sut.emit("event", null)` and assert the returned Result is successful (e.g.,
result.isSuccess), then call `sut.drain()` and assert the drained signal is the
raw/fallback signal (matching the behavior tested in the `redactor returning
null falls back to raw signal` test). Use the existing `StreamSignalRedactor`
that throws and the `createScope` helper to locate the code paths
(`StreamSignalRedactor`, `createScope`, `sut.emit`, `sut.drain`) and ensure the
test fails if emit silently did nothing.
- Around line 311-317: The current FIFO assertion is weak because
tags.indexOf("a") can be -1; update the test around sut.drain().getOrThrow() and
tags so you first assert required disk and memory tags exist (e.g., ensure
tags.contains("a") and tags.contains("d")), then strengthen the FIFO check by
computing all disk-tag indices and all memory-tag indices and asserting that the
maximum disk index is less than the minimum memory index (ensuring every disk
signal precedes every memory signal) instead of only comparing tags.indexOf("a")
and tags.indexOf("d").
- Around line 586-597: The two tests 'emit does not throw when spill directory
is read-only' and 'emit survives when spill dir is read-only' are flaky because
File.setReadOnly() can be ignored by root; replace the unreliable permission
approach with a deterministic write barrier: create a real file and pass
File(existingFile, "nested") as the spill dir so mkdirs() will always fail, or
add an early guard using Assume.assumeFalse(isRunningAsRoot()) to skip the test
under root; update the test setup in the methods that call
createScope(memoryCapacity = ..., dir = ...) to use the existing-file-parent
pattern (or the Assume guard) so the spill path is exercised consistently.
---
Nitpick comments:
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt`:
- Around line 128-141: The test uses runBlocking + Thread.sleep to wait for
async cleanup started by StreamTelemetryImpl, which is flaky; change the tests
to use kotlinx.coroutines.test runTest with a
StandardTestDispatcher/UnconfinedTestDispatcher and inject a
TestScope/TestDispatcher into StreamTelemetryImpl (or expose the cleanup Job) so
you can deterministically wait: construct StreamTelemetryImpl(context,
config(version = "1.0.0"), testScope) then call advanceUntilIdle() (or join the
exposed Job) instead of Thread.sleep; apply the same pattern to all telemetry
tests that use runBlocking + Thread.sleep.
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt`:
- Around line 337-352: The test disk rotation assertion currently skips
verification if the spill file doesn't exist; change the test `disk rotation
drops oldest when capacity exceeded` to require the spill file's existence after
calling `waitForSpill()`—replace the conditional `if (spillFile.exists())` with
an explicit assertion that `spillFile.exists()` is true, then perform the length
assertion against `spillFile.length()`; do the same tightening for the other
tests mentioned (`disk capacity of 1 byte trims aggressively` and `disk capacity
exactly matches one signal line`) so they assert the spill file exists before
checking its size, and keep use of `createScope(...)`, `waitForSpill()`,
`memoryCapacity`, and `diskCapacity` unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 8fd9783f-e9a4-42fb-a690-65c38324f54e
📒 Files selected for processing (10)
stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.ktstream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamTelemetryConfig.ktstream-android-core/src/main/java/io/getstream/android/core/api/model/telemetry/StreamSignal.ktstream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignalRedactor.ktstream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.ktstream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.ktstream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImpl.ktstream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.ktstream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.ktstream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt
| override fun emit(tag: String, data: Any?): Result<Unit> = | ||
| // runCatching (not cancellable) — emit is not a suspend function. | ||
| runCatching { | ||
| val raw = StreamSignal(tag = tag, data = data, timestamp = System.currentTimeMillis()) | ||
| val signal = redactor?.redact(raw) ?: raw | ||
| synchronized(lock) { | ||
| buffer.add(signal) | ||
| if (buffer.size > memoryCapacity) { | ||
| if (spilling.compareAndSet(false, true)) { | ||
| val snapshot = buffer | ||
| buffer = mutableListOf() | ||
| scope.launch(Dispatchers.IO) { spillToDisk(snapshot) } | ||
| } else { | ||
| buffer.removeAt(0) | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Redactor null result is not honored — signal leaks through unredacted.
Per the StreamSignalRedactor KDoc, returning null from redact(...) should drop the signal entirely. The current expression uses Elvis as a fallback when the redactor itself is null, but as written it also collapses the intentional "drop" case into the unredacted raw signal:
redactor = someRedactor // configured
someRedactor.redact(raw) // returns null to drop
result // ends up being raw, not dropped
This both (a) violates the documented contract and (b) is a potential privacy regression — a redactor explicitly asking to drop a signal actually buffers the unredacted payload.
🛠️ Proposed fix
runCatching {
val raw = StreamSignal(tag = tag, data = data, timestamp = System.currentTimeMillis())
- val signal = redactor?.redact(raw) ?: raw
+ val signal =
+ if (redactor == null) raw
+ else redactor.redact(raw) ?: return@runCatching // null means "drop"
synchronized(lock) {
buffer.add(signal)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt`
around lines 74 - 91, The emit method in StreamTelemetryScopeImpl currently uses
"val signal = redactor?.redact(raw) ?: raw" which accidentally re-inserts
unredacted `raw` when a non-null redactor intentionally returns null to drop a
signal; change this to call redact and if the result is null, exit without
buffering (i.e., do not add to buffer or trigger spilling). Specifically, in
emit(), replace the Elvis expression with a nullable result from
StreamSignalRedactor.redact(raw) and if that result is null, return/skip the
rest of the synchronized block so the signal is dropped rather than buffered.
| private fun trimDiskIfNeeded(file: File) { | ||
| if (!file.exists() || file.length() <= diskCapacity) { | ||
| return | ||
| } | ||
| val lines = file.readLines().toMutableList() | ||
| while (lines.isNotEmpty() && file.length() > diskCapacity) { | ||
| lines.removeAt(0) | ||
| file.writeText(lines.joinToString(separator = "\n", postfix = "\n")) | ||
| } | ||
| } |
There was a problem hiding this comment.
trimDiskIfNeeded is O(N²) in disk I/O — rewrites entire file per dropped line.
On each iteration the loop removes one line from the in-memory list and then rewrites the full file via file.writeText(...). Under disk pressure (many signals to drop at once) this causes N rewrites of an up-to-diskCapacity-bytes file. The file.length() comparison inside the loop also re-stats after every rewrite.
Compute the target slice once (based on cumulative byte size from the tail) and do a single write. E.g.:
♻️ Proposed refactor
- private fun trimDiskIfNeeded(file: File) {
- if (!file.exists() || file.length() <= diskCapacity) {
- return
- }
- val lines = file.readLines().toMutableList()
- while (lines.isNotEmpty() && file.length() > diskCapacity) {
- lines.removeAt(0)
- file.writeText(lines.joinToString(separator = "\n", postfix = "\n"))
- }
- }
+ private fun trimDiskIfNeeded(file: File) {
+ if (!file.exists() || file.length() <= diskCapacity) return
+ val lines = file.readLines()
+ // Keep the newest suffix that fits within diskCapacity (accounting for "\n" per line).
+ var bytes = 0L
+ var startIdx = lines.size
+ for (i in lines.indices.reversed()) {
+ val next = bytes + lines[i].toByteArray(Charsets.UTF_8).size + 1
+ if (next > diskCapacity) break
+ bytes = next
+ startIdx = i
+ }
+ val kept = lines.subList(startIdx, lines.size)
+ file.writeText(if (kept.isEmpty()) "" else kept.joinToString("\n", postfix = "\n"))
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt`
around lines 125 - 134, trimDiskIfNeeded currently removes one line and rewrites
the whole file repeatedly causing O(N²) disk I/O; change it to compute which
suffix of lines fits within diskCapacity once and then write that suffix in a
single file.writeText call. Specifically, in trimDiskIfNeeded: read all lines
into a list, iterate from the end accumulating the byte size of each line
(including the separator/newline) until the cumulative size <= diskCapacity,
then create the targetSlice as the tail subset that fits and call
file.writeText(targetSlice.joinToString("\n", postfix = "\n")) once (do not
re-check file.length() inside the loop). Ensure you reference the function name
trimDiskIfNeeded and use the same line-reading approach but only perform one
write.
| private fun encode(signal: StreamSignal): String { | ||
| val escapedTag = signal.tag.replace(DELIMITER, DELIMITER_ESCAPE) | ||
| val escapedData = signal.data?.toString()?.replace(DELIMITER, DELIMITER_ESCAPE).orEmpty() | ||
| return "${signal.timestamp}$DELIMITER$escapedTag$DELIMITER$escapedData" | ||
| } | ||
|
|
||
| @Suppress("ReturnCount") | ||
| private fun decode(line: String): StreamSignal? { | ||
| if (line.isBlank()) { | ||
| return null | ||
| } | ||
| val parts = line.split(DELIMITER, limit = 3) | ||
| if (parts.size < 2) { | ||
| return null | ||
| } | ||
| val timestamp = parts[0].toLongOrNull() ?: return null | ||
| val tag = parts[1].replace(DELIMITER_ESCAPE, DELIMITER) | ||
| val data = parts.getOrNull(2)?.replace(DELIMITER_ESCAPE, DELIMITER)?.ifEmpty { null } | ||
| return StreamSignal(tag = tag, data = data, timestamp = timestamp) | ||
| } |
There was a problem hiding this comment.
Disk round-trip silently coerces non-String payloads to String.
signal.data?.toString() flattens any object payload to its string representation on spill. decode(...) then produces StreamSignal.data of type String?. This yields an observable behavior difference between the same signal drained from memory (original type preserved) vs. drained after spill (now a String). Consumers comparing is/as or deserializing the payload downstream will see inconsistent types depending on buffer pressure — an easy source of flaky production bugs.
Options, in order of preference:
- Document the contract explicitly: "
StreamSignal.datais aString?after anydrain()that went through disk" and advise emitters to pre-serialize. - Constrain
StreamSignal.datatoString?at the API layer so all drains are consistent. - Use a proper serializer (e.g. Moshi
@JsonClass(generateAdapter = true)) for both tiers so memory vs. disk are indistinguishable.
At minimum, this deserves a KDoc note on StreamSignal.data and/or emit(...).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt`
around lines 149 - 168, encode(...) currently calls signal.data?.toString() so
decode(...) always yields StreamSignal.data as String?, causing memory vs disk
round-trip type inconsistency; update the public API docs: add KDoc to
StreamSignal.data (and to emit(...) / drain(...) where relevant) explicitly
stating that spills to disk coerce data to String? and callers must
pre-serialize or expect String, and include a note next steps (optionally change
StreamSignal.data to String? or replace encode/decode with a proper serializer
such as Moshi to preserve types); ensure you reference StreamSignal.data,
encode, and decode in the KDoc so callers see the behavior.
| @Test | ||
| fun `custom root overrides cacheDir`() = runBlocking { | ||
| val customRoot = tempDir.newFolder("custom-root") | ||
| val sut = StreamTelemetryImpl(context, config(version = "2.0.0", root = customRoot), scope) | ||
|
|
||
| val telemetryScope = sut.scope("test") | ||
| telemetryScope.emit("event", null) | ||
|
|
||
| // Spill dir should be under custom root | ||
| val expectedDir = File(customRoot, "stream/telemetry/2.0.0/test") | ||
| // Even if no spill happened, the scope should use this path | ||
| assertEquals("test", telemetryScope.name) | ||
| } |
There was a problem hiding this comment.
Test does not verify that the custom root is actually honored.
expectedDir is computed but never asserted on, and the only assertEquals checks telemetryScope.name == "test" — which is true for any scope regardless of root. As written, this test would still pass if config.root were silently ignored. Either trigger a spill and assert the spill file lands under expectedDir (and not under cacheDir), or assert that version cleanup uses customRoot (mirroring the custom basePath is used test).
🔧 Suggested tightening
`@Test`
fun `custom root overrides cacheDir`() = runBlocking {
val customRoot = tempDir.newFolder("custom-root")
- val sut = StreamTelemetryImpl(context, config(version = "2.0.0", root = customRoot), scope)
-
- val telemetryScope = sut.scope("test")
- telemetryScope.emit("event", null)
-
- // Spill dir should be under custom root
- val expectedDir = File(customRoot, "stream/telemetry/2.0.0/test")
- // Even if no spill happened, the scope should use this path
- assertEquals("test", telemetryScope.name)
+ // Seed a stale version under the custom root to verify cleanup targets it
+ // (and not cacheDir).
+ val staleUnderCustomRoot = File(customRoot, "stream/telemetry/0.9.0")
+ staleUnderCustomRoot.mkdirs()
+ val cacheDirTelemetry = File(cacheDir, "stream/telemetry/0.9.0")
+ cacheDirTelemetry.mkdirs()
+
+ StreamTelemetryImpl(context, config(version = "2.0.0", root = customRoot), scope)
+ Thread.sleep(500)
+
+ assertFalse("Stale version under custom root should be deleted", staleUnderCustomRoot.exists())
+ assertTrue("cacheDir should not be touched when custom root is set", cacheDirTelemetry.exists())
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt`
around lines 197 - 209, The test computes expectedDir but never verifies the
telemetry uses the custom root; update the test for StreamTelemetryImpl by
forcing a spill (e.g., emit enough events or call the telemetry scope's
flush/spill method after creating telemetryScope via
StreamTelemetryImpl.scope("test") and emitting an event) and then assert that
the actual spill path (the directory or file produced by that spill) is under
the computed expectedDir (File(customRoot, "stream/telemetry/2.0.0/test")), or
alternatively mirror the approach from the "custom basePath is used" test to
assert version cleanup/paths use config(root = customRoot) instead of cacheDir.
| @Test | ||
| fun `redactor exception does not crash emit`() { | ||
| val redactor = StreamSignalRedactor { throw RuntimeException("redactor boom") } | ||
| val sut = createScope(redactor = redactor) | ||
|
|
||
| sut.emit("event", null) | ||
| // No exception thrown | ||
| } |
There was a problem hiding this comment.
Test has no assertion — a silently-failed emit would not be caught.
Since emit returns Result<Unit> (it never throws), the only thing this test currently verifies is that the method doesn't throw synchronously — which the API already guarantees. To actually exercise the redactor failure path, assert that emit returned success and that a subsequent drain still works (and ideally yields the raw signal as a fallback, matching the redactor returning null falls back to raw signal test).
🔧 Suggested tightening
`@Test`
- fun `redactor exception does not crash emit`() {
+ fun `redactor exception does not crash emit`() = runBlocking {
val redactor = StreamSignalRedactor { throw RuntimeException("redactor boom") }
val sut = createScope(redactor = redactor)
- sut.emit("event", null)
- // No exception thrown
+ val emitResult = sut.emit("event", null)
+ assertTrue("emit should not surface redactor failure", emitResult.isSuccess)
+
+ // drain should still succeed; signal either falls back to raw or is dropped,
+ // depending on the documented contract.
+ val drainResult = sut.drain()
+ assertTrue(drainResult.isSuccess)
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt`
around lines 250 - 257, Update the test `redactor exception does not crash emit`
to actually assert the emit result and downstream behavior: call
`sut.emit("event", null)` and assert the returned Result is successful (e.g.,
result.isSuccess), then call `sut.drain()` and assert the drained signal is the
raw/fallback signal (matching the behavior tested in the `redactor returning
null falls back to raw signal` test). Use the existing `StreamSignalRedactor`
that throws and the `createScope` helper to locate the code paths
(`StreamSignalRedactor`, `createScope`, `sut.emit`, `sut.drain`) and ensure the
test fails if emit silently did nothing.
| val signals = sut.drain().getOrThrow() | ||
| val tags = signals.map { it.tag } | ||
| assertTrue( | ||
| "Expected disk signals before memory, got: $tags", | ||
| tags.indexOf("a") < tags.indexOf("d"), | ||
| ) | ||
| } |
There was a problem hiding this comment.
FIFO assertion is too weak and can pass even when "a" was dropped.
tags.indexOf("a") returns -1 if "a" isn't present, so -1 < tags.indexOf("d") evaluates to true whenever "d" is present — the test would silently pass even if the disk-before-memory invariant is broken and the oldest signal was lost. It also only checks a weak a before d relation, not that every disk signal precedes every memory signal as the test name claims.
🔧 Suggested stronger assertion
- val signals = sut.drain().getOrThrow()
- val tags = signals.map { it.tag }
- assertTrue(
- "Expected disk signals before memory, got: $tags",
- tags.indexOf("a") < tags.indexOf("d"),
- )
+ val tags = sut.drain().getOrThrow().map { it.tag }
+ assertEquals(listOf("a", "b", "c", "d"), tags)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| val signals = sut.drain().getOrThrow() | |
| val tags = signals.map { it.tag } | |
| assertTrue( | |
| "Expected disk signals before memory, got: $tags", | |
| tags.indexOf("a") < tags.indexOf("d"), | |
| ) | |
| } | |
| val tags = sut.drain().getOrThrow().map { it.tag } | |
| assertEquals(listOf("a", "b", "c", "d"), tags) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt`
around lines 311 - 317, The current FIFO assertion is weak because
tags.indexOf("a") can be -1; update the test around sut.drain().getOrThrow() and
tags so you first assert required disk and memory tags exist (e.g., ensure
tags.contains("a") and tags.contains("d")), then strengthen the FIFO check by
computing all disk-tag indices and all memory-tag indices and asserting that the
maximum disk index is less than the minimum memory index (ensuring every disk
signal precedes every memory signal) instead of only comparing tags.indexOf("a")
and tags.indexOf("d").
| fun `emit does not throw when spill directory is read-only`() = runBlocking { | ||
| val readOnlyDir = tempDir.newFolder("readonly") | ||
| readOnlyDir.setReadOnly() | ||
|
|
||
| val sut = createScope(memoryCapacity = 2, dir = File(readOnlyDir, "nested")) | ||
|
|
||
| repeat(5) { sut.emit("event-$it", null) } | ||
| waitForSpill() | ||
|
|
||
| val result = sut.drain() | ||
| assertTrue("drain should succeed even after spill failure", result.isSuccess) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Does java.io.File.setReadOnly() on a directory prevent creation of new files inside it on Linux when the process runs as root?
💡 Result:
No, java.io.File.setReadOnly on a directory does not prevent creation of new files inside it on Linux when the process runs as root.
Citations:
- 1: https://stackoverflow.com/questions/27100628/does-root-overrides-read-only-permissioneven-to-root-set-to-directory-in-unix
- 2: https://stackoverflow.com/questions/74453303/cannot-make-file-readonly-in-java-8-on-linux
- 3: https://stackoverflow.com/questions/61057493/why-does-files-iswritable-method-return-true-for-a-directory-even-when-it-is-s
- 4: https://unix.stackexchange.com/questions/504223/why-am-i-able-to-still-write-to-a-file-marked-400
🏁 Script executed:
# Find and examine the test file
find . -name "StreamTelemetryScopeImplTest.kt" -type fRepository: GetStream/stream-core-android
Length of output: 186
🏁 Script executed:
# Once found, examine the test code at the specified lines
head -n 620 ./stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt | tail -n 40Repository: GetStream/stream-core-android
Length of output: 1372
🏁 Script executed:
# Also check the second test mentioned (lines 897-917)
sed -n '885,930p' ./stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.ktRepository: GetStream/stream-core-android
Length of output: 1493
🏁 Script executed:
# Find the StreamTelemetryScopeImpl implementation
find . -name "StreamTelemetryScopeImpl.kt" -type fRepository: GetStream/stream-core-android
Length of output: 182
🏁 Script executed:
# Check the implementation around spill directory creation
grep -n "spillDir\|mkdirs\|mkdir" ./stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt | head -20Repository: GetStream/stream-core-android
Length of output: 197
🏁 Script executed:
# Look at how emit handles spill failures
sed -n '1,100p' ./stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.ktRepository: GetStream/stream-core-android
Length of output: 4013
🏁 Script executed:
# Get the spillToDisk method to see how it handles failures
sed -n '100,200p' ./stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.ktRepository: GetStream/stream-core-android
Length of output: 2790
🏁 Script executed:
# Get the drainDisk method
sed -n '200,300p' ./stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.ktRepository: GetStream/stream-core-android
Length of output: 55
🏁 Script executed:
# Check if there are environment/permission checks elsewhere in the test
grep -n "root\|setReadOnly\|permission\|Assume" ./stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.ktRepository: GetStream/stream-core-android
Length of output: 335
Both tests using setReadOnly() are unreliable when running as root—they may pass without exercising the intended failure path.
File.setReadOnly() does not prevent directory/file creation on Linux when the process runs as root; it only clears the write bit for the owner, which root ignores. Since spillDir.mkdirs() catches all exceptions and the implementation accepts spill failures gracefully (resetting the spilling flag and returning success from drain() regardless), these tests will silently succeed with a successful spill rather than validating resilience to permission failures.
Use a deterministic write barrier instead: make the parent directory a regular file (e.g., File(existingFile, "nested") where existingFile is a real file), so mkdirs() must fail regardless of uid. Alternatively, guard with Assume.assumeFalse(isRunningAsRoot).
Applies to:
- Lines 586-597:
emit does not throw when spill directory is read-only - Lines 897-917:
emit survives when spill dir is read-only
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt`
around lines 586 - 597, The two tests 'emit does not throw when spill directory
is read-only' and 'emit survives when spill dir is read-only' are flaky because
File.setReadOnly() can be ignored by root; replace the unreliable permission
approach with a deterministic write barrier: create a real file and pass
File(existingFile, "nested") as the spill dir so mkdirs() will always fail, or
add an early guard using Assume.assumeFalse(isRunningAsRoot()) to skip the test
under root; update the test setup in the methods that call
createScope(memoryCapacity = ..., dir = ...) to use the existing-file-parent
pattern (or the Assume guard) so the spill path is exercised consistently.
|




Goal
Introduce a general-purpose telemetry engine in core that product SDKs (Chat, Video, Feeds) can use to collect, buffer, and drain operational signals.
Implementation
API layer (
api/telemetry/):StreamSignal— data class: tag + data + timestampStreamTelemetryScope— named scope withemit()anddrain()StreamTelemetry— engine interface, factory function, andStreamTelemetryNoOpStreamSignalRedactor— anonymization hook that runs onemit()StreamTelemetryConfig— memory/disk capacity, configurable paths, SDK-versioned disk layoutInternal layer (
internal/telemetry/):StreamTelemetryScopeImpl— buffer-swap thread safety, memory-first with disk spill on overflow, drops oldest under pressureStreamTelemetryImpl— scope factory with stale version cleanup on initIntegration:
telemetry: StreamTelemetry? = nulltoStreamComponentProviderStreamTelemetryNoOp(zero cost)Design highlights:
emit()never throws or affects the callerTesting
StreamTelemetryScopeImpl(emit, drain, spill, redaction, thread safety)StreamTelemetryImpl(scope creation, version cleanup)pr: core
Summary by CodeRabbit
Release Notes