From f7b717a02e31fe2845a2099f676d1ef74cdebb2e Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Fri, 24 Apr 2026 09:51:19 +0200 Subject: [PATCH 1/9] feat(telemetry): add StreamTelemetry library Introduce a general-purpose telemetry engine for collecting signals across named scopes. Product SDKs inject it via StreamComponentProvider, core emits internally, product drains on its own schedule. - StreamSignal: tag + data + timestamp - StreamTelemetryScope: emit() + drain() with buffer-swap thread safety - StreamTelemetryScopeImpl: memory-first, disk spill on overflow, drop oldest under pressure - StreamSignalRedactor: anonymization hook on emit() - StreamTelemetryConfig: per-scope memory/disk caps, configurable paths, SDK-versioned disk layout with stale version cleanup - StreamTelemetryNoOp: zero-cost default when telemetry not provided - StreamComponentProvider: added telemetry field --- .../model/config/StreamComponentProvider.kt | 3 + .../core/api/telemetry/StreamSignal.kt | 30 ++++ .../api/telemetry/StreamSignalRedactor.kt | 49 ++++++ .../core/api/telemetry/StreamTelemetry.kt | 98 +++++++++++ .../api/telemetry/StreamTelemetryConfig.kt | 45 +++++ .../api/telemetry/StreamTelemetryScope.kt | 68 ++++++++ .../internal/telemetry/StreamTelemetryImpl.kt | 77 ++++++++ .../telemetry/StreamTelemetryScopeImpl.kt | 165 ++++++++++++++++++ 8 files changed, 535 insertions(+) create mode 100644 stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignal.kt create mode 100644 stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignalRedactor.kt create mode 100644 stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt create mode 100644 stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryConfig.kt create mode 100644 stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt create mode 100644 stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImpl.kt create mode 100644 stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt index b1c18b9..0bc1e7b 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt @@ -31,6 +31,7 @@ import io.getstream.android.core.api.socket.StreamWebSocketFactory import io.getstream.android.core.api.socket.listeners.StreamClientListener import io.getstream.android.core.api.socket.monitor.StreamHealthMonitor import io.getstream.android.core.api.subscribe.StreamSubscriptionManager +import io.getstream.android.core.api.telemetry.StreamTelemetry /** * Optional overrides for internal components used by @@ -70,6 +71,7 @@ import io.getstream.android.core.api.subscribe.StreamSubscriptionManager * @param connectionRecoveryEvaluator Reconnection heuristics evaluator. * @param clientSubscriptionManager Socket-level listener registry. * @param androidComponentsProvider Android system service provider. + * @param telemetry Telemetry engine. When `null`, core uses a no-op that discards all signals. */ @Suppress("LongParameterList") @StreamInternalApi @@ -87,4 +89,5 @@ public data class StreamComponentProvider( val connectionRecoveryEvaluator: StreamConnectionRecoveryEvaluator? = null, val clientSubscriptionManager: StreamSubscriptionManager? = null, val androidComponentsProvider: StreamAndroidComponentsProvider? = null, + val telemetry: StreamTelemetry? = null, ) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignal.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignal.kt new file mode 100644 index 0000000..4517df7 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignal.kt @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.telemetry + +import io.getstream.android.core.annotations.StreamInternalApi + +/** + * A single unit of telemetry data captured by a [StreamTelemetryScope]. + * + * @param tag Identifies what happened (e.g. `"connected"`, `"token.refreshed"`, + * `"network.changed"`). + * @param data Arbitrary payload associated with the signal. May be `null` for tag-only signals. + * @param timestamp Epoch milliseconds when the signal was recorded. + */ +@StreamInternalApi +public data class StreamSignal(val tag: String, val data: Any?, val timestamp: Long) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignalRedactor.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignalRedactor.kt new file mode 100644 index 0000000..d473709 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignalRedactor.kt @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.telemetry + +import io.getstream.android.core.annotations.StreamInternalApi + +/** + * Transforms or redacts sensitive data from a [StreamSignal] before it is stored. + * + * Redactors run synchronously on [StreamTelemetryScope.emit] — the returned signal is what gets + * buffered. Return `null` to drop the signal entirely. + * + * ### Example + * + * ```kotlin + * val redactor = StreamSignalRedactor { signal -> + * if (signal.tag == "auth.token") { + * signal.copy(data = "[REDACTED]") + * } else { + * signal + * } + * } + * ``` + */ +@StreamInternalApi +public fun interface StreamSignalRedactor { + + /** + * Transforms or redacts the given [signal]. + * + * @param signal The signal to process. + * @return The redacted signal, or `null` to drop it. + */ + public fun redact(signal: StreamSignal): StreamSignal? +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt new file mode 100644 index 0000000..27bf4fc --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.telemetry + +import android.content.Context +import io.getstream.android.core.annotations.StreamInternalApi +import io.getstream.android.core.internal.telemetry.StreamTelemetryImpl + +/** + * Engine for collecting telemetry signals across named [scopes][StreamTelemetryScope]. + * + * Product SDKs create an instance via the [StreamTelemetry] factory, inject it into + * [StreamComponentProvider][io.getstream.android.core.api.model.config.StreamComponentProvider], + * and retain a reference for draining. Core emits signals internally; the product SDK decides when + * and where to send them. + * + * If no telemetry is provided, core uses [StreamTelemetryNoOp] which discards everything at zero + * cost. + * + * ### Usage + * + * ```kotlin + * // Product SDK creates and keeps a reference + * val telemetry = StreamTelemetry(context, config) + * + * // Inject into core + * val client = StreamClient( + * ..., + * components = StreamComponentProvider(telemetry = telemetry), + * ) + * + * // Core emits internally + * telemetry.scope("connection").emit("connected", connectionId) + * + * // Product SDK drains on its own schedule + * val signals = telemetry.scope("connection").drain() + * ``` + */ +@StreamInternalApi +public interface StreamTelemetry { + + /** + * Returns the [StreamTelemetryScope] for the given [name], creating it if it doesn't exist. + * + * Scope names are open — any string is valid. Core uses names like `"connection"`, `"network"`, + * `"auth"`, and `"device"`. Product SDKs add their own (e.g. `"sfu"`, `"publisher"`). + * + * @param name Scope identifier. + * @return The scope for [name]. + */ + public fun scope(name: String): StreamTelemetryScope +} + +/** + * Creates a [StreamTelemetry] instance backed by the default implementation. + * + * @param context Android application context (used for `cacheDir` when [StreamTelemetryConfig.root] + * is `null`). + * @param config Telemetry configuration. + * @return A new [StreamTelemetry] instance. + */ +@StreamInternalApi +public fun StreamTelemetry(context: Context, config: StreamTelemetryConfig): StreamTelemetry = + StreamTelemetryImpl(context.applicationContext, config) + +/** + * No-op implementation that discards all signals without allocating buffers. + * + * Used internally when no telemetry is provided via + * [StreamComponentProvider][io.getstream.android.core.api.model.config.StreamComponentProvider]. + */ +@StreamInternalApi +public object StreamTelemetryNoOp : StreamTelemetry { + + override fun scope(name: String): StreamTelemetryScope = NoOpScope + + private object NoOpScope : StreamTelemetryScope { + override val name: String = "noop" + + override fun emit(tag: String, data: Any?) = Unit + + override fun drain(): List = emptyList() + } +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryConfig.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryConfig.kt new file mode 100644 index 0000000..a7ffa5a --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryConfig.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.telemetry + +import io.getstream.android.core.annotations.StreamInternalApi +import java.io.File + +/** + * Configuration for [StreamTelemetry]. + * + * @param root Root directory for disk spill storage. Defaults to `context.cacheDir` (passed at + * creation time via the factory). + * @param basePath Subdirectory under [root] for telemetry files (e.g. `"stream/telemetry"`). + * @param version Version tag for the disk format — typically the core SDK version. On + * initialization, any directories under `{root}/{basePath}` that don't match this version are + * deleted. + * @param memoryCapacity Maximum number of signals held in memory per scope before spilling to disk. + * @param diskCapacity Maximum bytes of disk storage per scope. When exceeded, the oldest signals + * are dropped. + * @param redactor Optional [StreamSignalRedactor] applied to every signal on + * [emit][StreamTelemetryScope.emit]. + */ +@StreamInternalApi +public data class StreamTelemetryConfig( + val root: File? = null, + val basePath: String = "stream/telemetry", + val version: String, + val memoryCapacity: Int = 500, + val diskCapacity: Long = 1_000_000L, + val redactor: StreamSignalRedactor? = null, +) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt new file mode 100644 index 0000000..eab7cc1 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.telemetry + +import io.getstream.android.core.annotations.StreamInternalApi + +/** + * A named channel that groups related [signals][StreamSignal]. + * + * Scopes are created via [StreamTelemetry.scope]. Each scope maintains its own memory buffer and + * optional disk spill. Signals flow in via [emit] and out via [drain]. + * + * ### Threading + * + * All operations are thread-safe. [emit] never blocks or suspends. + * + * ### Example + * + * ```kotlin + * val scope = telemetry.scope("connection") + * scope.emit("connected", connectionId) + * scope.emit("disconnected", null) + * + * // Later, when ready to send + * val signals: List = scope.drain() + * ``` + */ +@StreamInternalApi +public interface StreamTelemetryScope { + + /** The name of this scope (e.g. `"connection"`, `"sfu"`, `"network"`). */ + public val name: String + + /** + * Records a signal into this scope. + * + * The [StreamSignalRedactor] (if configured) runs synchronously before the signal is buffered. + * Failures are silently consumed — this method never throws or affects the caller's flow. + * + * @param tag Identifies what happened. + * @param data Optional payload. Pass `null` for tag-only signals. + */ + public fun emit(tag: String, data: Any? = null) + + /** + * Atomically consumes all buffered signals, including any that were spilled to disk. + * + * After this call, the scope's buffer is empty. Disk-spilled signals are read first (oldest), + * followed by in-memory signals (newest), preserving FIFO order. + * + * @return All buffered signals in chronological order, or an empty list if none. + */ + public fun drain(): List +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImpl.kt new file mode 100644 index 0000000..878fad1 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImpl.kt @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.internal.telemetry + +import android.content.Context +import io.getstream.android.core.api.telemetry.StreamTelemetry +import io.getstream.android.core.api.telemetry.StreamTelemetryConfig +import io.getstream.android.core.api.telemetry.StreamTelemetryScope +import java.io.File +import java.util.concurrent.ConcurrentHashMap + +/** + * Default [StreamTelemetry] implementation. + * + * On creation, scans the telemetry base directory and deletes any version subdirectories that don't + * match [StreamTelemetryConfig.version]. This ensures stale spill files from older SDK versions are + * never deserialized with an incompatible format. + */ +internal class StreamTelemetryImpl(context: Context, private val config: StreamTelemetryConfig) : + StreamTelemetry { + + private val scopes = ConcurrentHashMap() + + private val baseDir: File = + File(config.root ?: context.cacheDir, "${config.basePath}/${config.version}") + + init { + cleanStaleVersions() + } + + override fun scope(name: String): StreamTelemetryScope = + scopes.getOrPut(name) { + StreamTelemetryScopeImpl( + name = name, + memoryCapacity = config.memoryCapacity, + diskCapacity = config.diskCapacity, + spillDir = File(baseDir, name), + redactor = config.redactor, + ) + } + + /** + * Deletes any sibling directories under the telemetry base path that don't match the current + * version. For example, if the base path is `stream/telemetry` and the version is `1.2.0`, this + * deletes `stream/telemetry/1.1.0/` but keeps `stream/telemetry/1.2.0/`. + */ + @Suppress("ReturnCount") + private fun cleanStaleVersions() { + try { + val versionParent = baseDir.parentFile ?: return + if (!versionParent.exists()) { + return + } + versionParent.listFiles()?.forEach { dir -> + if (dir.isDirectory && dir.name != config.version) { + dir.deleteRecursively() + } + } + } catch (@Suppress("TooGenericExceptionCaught") ignored: Exception) { + // Best-effort cleanup. If it fails, stale files remain in cache — OS can reclaim. + } + } +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt new file mode 100644 index 0000000..0e5ac55 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.internal.telemetry + +import io.getstream.android.core.api.telemetry.StreamSignal +import io.getstream.android.core.api.telemetry.StreamSignalRedactor +import io.getstream.android.core.api.telemetry.StreamTelemetryScope +import java.io.File +import java.util.concurrent.atomic.AtomicBoolean + +/** + * Thread-safe [StreamTelemetryScope] backed by an in-memory ring buffer that spills to disk when + * full. + * + * ### Buffer swap + * + * [emit] appends to a mutable list guarded by an internal lock. [drain] atomically swaps the list + * for a fresh one, so reads and writes never contend beyond the swap itself. + * + * ### Disk spill + * + * When the memory buffer exceeds the configured capacity, the oldest signals are serialized to + * disk. If a spill is already in progress, the oldest in-memory signal is dropped instead. Disk + * storage is capped per scope; when exceeded the oldest lines are removed. + * + * ### Drain order + * + * [drain] returns disk-spilled signals first (oldest), then in-memory signals (newest), preserving + * FIFO order across both tiers. + */ +internal class StreamTelemetryScopeImpl( + override val name: String, + private val memoryCapacity: Int, + private val diskCapacity: Long, + private val spillDir: File, + private val redactor: StreamSignalRedactor?, +) : StreamTelemetryScope { + + private val lock = Any() + private val spillFile: File + get() = File(spillDir, SPILL_FILE_NAME) + + private var buffer = mutableListOf() + private val spilling = AtomicBoolean(false) + + override fun emit(tag: String, data: Any?) { + try { + val raw = StreamSignal(tag = tag, data = data, timestamp = System.currentTimeMillis()) + val signal = redactor?.redact(raw) ?: raw + synchronized(lock) { + buffer.add(signal) + if (buffer.size > memoryCapacity) { + if (spilling.compareAndSet(false, true)) { + val snapshot = buffer + buffer = mutableListOf() + spillToDisk(snapshot) + } else { + buffer.removeFirst() + } + } + } + } catch (@Suppress("TooGenericExceptionCaught") ignored: Exception) { + // Telemetry must never affect the caller. + } + } + + override fun drain(): List { + val memorySnapshot: List + synchronized(lock) { + memorySnapshot = buffer + buffer = mutableListOf() + } + val diskSignals = drainDisk() + return if (diskSignals.isEmpty()) { + memorySnapshot + } else { + diskSignals + memorySnapshot + } + } + + // --- Disk spill ---------------------------------------------------------------- + + private fun spillToDisk(signals: List) { + try { + spillDir.mkdirs() + val file = spillFile + file.appendText(signals.joinToString(separator = "\n", postfix = "\n") { encode(it) }) + trimDiskIfNeeded(file) + } catch (@Suppress("TooGenericExceptionCaught") ignored: Exception) { + // Disk I/O failure — signals are lost. That's acceptable for telemetry. + } finally { + spilling.set(false) + } + } + + private fun trimDiskIfNeeded(file: File) { + if (!file.exists() || file.length() <= diskCapacity) { + return + } + val lines = file.readLines().toMutableList() + while (lines.isNotEmpty() && file.length() > diskCapacity) { + lines.removeFirst() + file.writeText(lines.joinToString(separator = "\n", postfix = "\n")) + } + } + + @Suppress("ReturnCount") + private fun drainDisk(): List { + val file = spillFile + if (!file.exists() || file.length() == 0L) { + return emptyList() + } + return try { + val signals = file.readLines().mapNotNull { decode(it) } + file.delete() + signals + } catch (@Suppress("TooGenericExceptionCaught") ignored: Exception) { + file.delete() + emptyList() + } + } + + // --- Serialization (simple line-based format) ----------------------------------- + + private fun encode(signal: StreamSignal): String { + val escapedTag = signal.tag.replace(DELIMITER, DELIMITER_ESCAPE) + val escapedData = signal.data?.toString()?.replace(DELIMITER, DELIMITER_ESCAPE).orEmpty() + return "${signal.timestamp}$DELIMITER$escapedTag$DELIMITER$escapedData" + } + + @Suppress("ReturnCount") + private fun decode(line: String): StreamSignal? { + if (line.isBlank()) { + return null + } + val parts = line.split(DELIMITER, limit = 3) + if (parts.size < 2) { + return null + } + val timestamp = parts[0].toLongOrNull() ?: return null + val tag = parts[1].replace(DELIMITER_ESCAPE, DELIMITER) + val data = parts.getOrNull(2)?.replace(DELIMITER_ESCAPE, DELIMITER)?.ifEmpty { null } + return StreamSignal(tag = tag, data = data, timestamp = timestamp) + } + + private companion object { + const val SPILL_FILE_NAME = "spill.bin" + const val DELIMITER = "\t" + const val DELIMITER_ESCAPE = "\\t" + } +} From 13998249639f93c10d777aa901c56644760dd87d Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Fri, 24 Apr 2026 10:20:12 +0200 Subject: [PATCH 2/9] refactor(telemetry): move models to correct packages, IO dispatcher for disk ops MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - StreamSignal → api/model/telemetry/ (data models package) - StreamTelemetryConfig → api/model/config/ (alongside other configs) - drain() is now suspend, disk reads on Dispatchers.IO - spillToDisk launches on Dispatchers.IO via scope - cleanStaleVersions runs on Dispatchers.IO at init - StreamTelemetryImpl and StreamTelemetryScopeImpl accept CoroutineScope --- .../config}/StreamTelemetryConfig.kt | 5 ++++- .../api/{ => model}/telemetry/StreamSignal.kt | 2 +- .../api/telemetry/StreamSignalRedactor.kt | 1 + .../core/api/telemetry/StreamTelemetry.kt | 13 ++++++++++--- .../api/telemetry/StreamTelemetryScope.kt | 5 ++++- .../internal/telemetry/StreamTelemetryImpl.kt | 15 +++++++++++---- .../telemetry/StreamTelemetryScopeImpl.kt | 19 ++++++++++++------- 7 files changed, 43 insertions(+), 17 deletions(-) rename stream-android-core/src/main/java/io/getstream/android/core/api/{telemetry => model/config}/StreamTelemetryConfig.kt (87%) rename stream-android-core/src/main/java/io/getstream/android/core/api/{ => model}/telemetry/StreamSignal.kt (95%) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryConfig.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamTelemetryConfig.kt similarity index 87% rename from stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryConfig.kt rename to stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamTelemetryConfig.kt index a7ffa5a..208140c 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryConfig.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamTelemetryConfig.kt @@ -14,9 +14,12 @@ * limitations under the License. */ -package io.getstream.android.core.api.telemetry +package io.getstream.android.core.api.model.config import io.getstream.android.core.annotations.StreamInternalApi +import io.getstream.android.core.api.telemetry.StreamSignalRedactor +import io.getstream.android.core.api.telemetry.StreamTelemetry +import io.getstream.android.core.api.telemetry.StreamTelemetryScope import java.io.File /** diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignal.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/model/telemetry/StreamSignal.kt similarity index 95% rename from stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignal.kt rename to stream-android-core/src/main/java/io/getstream/android/core/api/model/telemetry/StreamSignal.kt index 4517df7..57b7d26 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignal.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/model/telemetry/StreamSignal.kt @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.getstream.android.core.api.telemetry +package io.getstream.android.core.api.model.telemetry import io.getstream.android.core.annotations.StreamInternalApi diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignalRedactor.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignalRedactor.kt index d473709..6ab6ce1 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignalRedactor.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignalRedactor.kt @@ -17,6 +17,7 @@ package io.getstream.android.core.api.telemetry import io.getstream.android.core.annotations.StreamInternalApi +import io.getstream.android.core.api.model.telemetry.StreamSignal /** * Transforms or redacts sensitive data from a [StreamSignal] before it is stored. diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt index 27bf4fc..0ee27c6 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt @@ -18,7 +18,10 @@ package io.getstream.android.core.api.telemetry import android.content.Context import io.getstream.android.core.annotations.StreamInternalApi +import io.getstream.android.core.api.model.config.StreamTelemetryConfig +import io.getstream.android.core.api.model.telemetry.StreamSignal import io.getstream.android.core.internal.telemetry.StreamTelemetryImpl +import kotlinx.coroutines.CoroutineScope /** * Engine for collecting telemetry signals across named [scopes][StreamTelemetryScope]. @@ -71,11 +74,15 @@ public interface StreamTelemetry { * @param context Android application context (used for `cacheDir` when [StreamTelemetryConfig.root] * is `null`). * @param config Telemetry configuration. + * @param scope Coroutine scope for disk I/O operations (spill, drain, cleanup). * @return A new [StreamTelemetry] instance. */ @StreamInternalApi -public fun StreamTelemetry(context: Context, config: StreamTelemetryConfig): StreamTelemetry = - StreamTelemetryImpl(context.applicationContext, config) +public fun StreamTelemetry( + context: Context, + config: StreamTelemetryConfig, + scope: CoroutineScope, +): StreamTelemetry = StreamTelemetryImpl(context.applicationContext, config, scope) /** * No-op implementation that discards all signals without allocating buffers. @@ -93,6 +100,6 @@ public object StreamTelemetryNoOp : StreamTelemetry { override fun emit(tag: String, data: Any?) = Unit - override fun drain(): List = emptyList() + override suspend fun drain(): List = emptyList() } } diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt index eab7cc1..a2d6216 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt @@ -17,6 +17,7 @@ package io.getstream.android.core.api.telemetry import io.getstream.android.core.annotations.StreamInternalApi +import io.getstream.android.core.api.model.telemetry.StreamSignal /** * A named channel that groups related [signals][StreamSignal]. @@ -62,7 +63,9 @@ public interface StreamTelemetryScope { * After this call, the scope's buffer is empty. Disk-spilled signals are read first (oldest), * followed by in-memory signals (newest), preserving FIFO order. * + * Disk reads run on `Dispatchers.IO`. + * * @return All buffered signals in chronological order, or an empty list if none. */ - public fun drain(): List + public suspend fun drain(): List } diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImpl.kt index 878fad1..9bc6155 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImpl.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImpl.kt @@ -17,11 +17,14 @@ package io.getstream.android.core.internal.telemetry import android.content.Context +import io.getstream.android.core.api.model.config.StreamTelemetryConfig import io.getstream.android.core.api.telemetry.StreamTelemetry -import io.getstream.android.core.api.telemetry.StreamTelemetryConfig import io.getstream.android.core.api.telemetry.StreamTelemetryScope import java.io.File import java.util.concurrent.ConcurrentHashMap +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch /** * Default [StreamTelemetry] implementation. @@ -30,8 +33,11 @@ import java.util.concurrent.ConcurrentHashMap * match [StreamTelemetryConfig.version]. This ensures stale spill files from older SDK versions are * never deserialized with an incompatible format. */ -internal class StreamTelemetryImpl(context: Context, private val config: StreamTelemetryConfig) : - StreamTelemetry { +internal class StreamTelemetryImpl( + context: Context, + private val config: StreamTelemetryConfig, + private val scope: CoroutineScope, +) : StreamTelemetry { private val scopes = ConcurrentHashMap() @@ -39,7 +45,7 @@ internal class StreamTelemetryImpl(context: Context, private val config: StreamT File(config.root ?: context.cacheDir, "${config.basePath}/${config.version}") init { - cleanStaleVersions() + scope.launch(Dispatchers.IO) { cleanStaleVersions() } } override fun scope(name: String): StreamTelemetryScope = @@ -50,6 +56,7 @@ internal class StreamTelemetryImpl(context: Context, private val config: StreamT diskCapacity = config.diskCapacity, spillDir = File(baseDir, name), redactor = config.redactor, + scope = scope, ) } diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt index 0e5ac55..efd92a9 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt @@ -16,11 +16,15 @@ package io.getstream.android.core.internal.telemetry -import io.getstream.android.core.api.telemetry.StreamSignal +import io.getstream.android.core.api.model.telemetry.StreamSignal import io.getstream.android.core.api.telemetry.StreamSignalRedactor import io.getstream.android.core.api.telemetry.StreamTelemetryScope import java.io.File import java.util.concurrent.atomic.AtomicBoolean +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext /** * Thread-safe [StreamTelemetryScope] backed by an in-memory ring buffer that spills to disk when @@ -33,9 +37,9 @@ import java.util.concurrent.atomic.AtomicBoolean * * ### Disk spill * - * When the memory buffer exceeds the configured capacity, the oldest signals are serialized to - * disk. If a spill is already in progress, the oldest in-memory signal is dropped instead. Disk - * storage is capped per scope; when exceeded the oldest lines are removed. + * When the memory buffer exceeds the configured capacity, the oldest signals are serialized to disk + * on [Dispatchers.IO]. If a spill is already in progress, the oldest in-memory signal is dropped + * instead. Disk storage is capped per scope; when exceeded the oldest lines are removed. * * ### Drain order * @@ -48,6 +52,7 @@ internal class StreamTelemetryScopeImpl( private val diskCapacity: Long, private val spillDir: File, private val redactor: StreamSignalRedactor?, + private val scope: CoroutineScope, ) : StreamTelemetryScope { private val lock = Any() @@ -67,7 +72,7 @@ internal class StreamTelemetryScopeImpl( if (spilling.compareAndSet(false, true)) { val snapshot = buffer buffer = mutableListOf() - spillToDisk(snapshot) + scope.launch(Dispatchers.IO) { spillToDisk(snapshot) } } else { buffer.removeFirst() } @@ -78,13 +83,13 @@ internal class StreamTelemetryScopeImpl( } } - override fun drain(): List { + override suspend fun drain(): List { val memorySnapshot: List synchronized(lock) { memorySnapshot = buffer buffer = mutableListOf() } - val diskSignals = drainDisk() + val diskSignals = withContext(Dispatchers.IO) { drainDisk() } return if (diskSignals.isEmpty()) { memorySnapshot } else { From b5ad9da892bf71577ec700817d54c49a9eabc4cf Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Fri, 24 Apr 2026 10:26:12 +0200 Subject: [PATCH 3/9] refactor(telemetry): drain() returns Result, use runCatchingCancellable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - drain() now returns Result> so product SDK can distinguish between empty (no signals) and failure (disk I/O error) - Use runCatchingCancellable to properly rethrow CancellationException - drainDisk() no longer swallows exceptions — they propagate to Result --- .../core/api/telemetry/StreamTelemetry.kt | 2 +- .../core/api/telemetry/StreamTelemetryScope.kt | 5 +++-- .../telemetry/StreamTelemetryScopeImpl.kt | 17 ++++++----------- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt index 0ee27c6..7dbf38e 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt @@ -100,6 +100,6 @@ public object StreamTelemetryNoOp : StreamTelemetry { override fun emit(tag: String, data: Any?) = Unit - override suspend fun drain(): List = emptyList() + override suspend fun drain(): Result> = Result.success(emptyList()) } } diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt index a2d6216..b1a0f9b 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt @@ -65,7 +65,8 @@ public interface StreamTelemetryScope { * * Disk reads run on `Dispatchers.IO`. * - * @return All buffered signals in chronological order, or an empty list if none. + * @return [Result.success] with signals in chronological order, or [Result.failure] if the + * drain could not complete (e.g. disk I/O error). */ - public suspend fun drain(): List + public suspend fun drain(): Result> } diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt index efd92a9..9187c1d 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt @@ -19,6 +19,7 @@ package io.getstream.android.core.internal.telemetry import io.getstream.android.core.api.model.telemetry.StreamSignal import io.getstream.android.core.api.telemetry.StreamSignalRedactor import io.getstream.android.core.api.telemetry.StreamTelemetryScope +import io.getstream.android.core.api.utils.runCatchingCancellable import java.io.File import java.util.concurrent.atomic.AtomicBoolean import kotlinx.coroutines.CoroutineScope @@ -83,14 +84,14 @@ internal class StreamTelemetryScopeImpl( } } - override suspend fun drain(): List { + override suspend fun drain(): Result> = runCatchingCancellable { val memorySnapshot: List synchronized(lock) { memorySnapshot = buffer buffer = mutableListOf() } val diskSignals = withContext(Dispatchers.IO) { drainDisk() } - return if (diskSignals.isEmpty()) { + if (diskSignals.isEmpty()) { memorySnapshot } else { diskSignals + memorySnapshot @@ -123,20 +124,14 @@ internal class StreamTelemetryScopeImpl( } } - @Suppress("ReturnCount") private fun drainDisk(): List { val file = spillFile if (!file.exists() || file.length() == 0L) { return emptyList() } - return try { - val signals = file.readLines().mapNotNull { decode(it) } - file.delete() - signals - } catch (@Suppress("TooGenericExceptionCaught") ignored: Exception) { - file.delete() - emptyList() - } + val signals = file.readLines().mapNotNull { decode(it) } + file.delete() + return signals } // --- Serialization (simple line-based format) ----------------------------------- From 354f588e6422c61d1edb4a1dc91e32c4cc3bfc44 Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Fri, 24 Apr 2026 10:42:23 +0200 Subject: [PATCH 4/9] test(telemetry): add comprehensive tests for scope and telemetry impl 77 tests covering: - Basic emit/drain, ordering, timestamps - Drain semantics (clear, idempotent, interleaved cycles) - Redactor (transform, drop fallback, exception safety) - Memory capacity boundaries (at, below, above, 0, 1, large) - Disk spill (FIFO order, multiple spills, accumulation) - Disk capacity boundaries (0, 1 byte, exact fit, rotation) - Thread safety (concurrent emit, concurrent emit+drain) - Disk I/O failures (read-only dir, corrupt files, external deletion) - Corrupt spill files (blank lines, mixed valid/invalid, binary garbage, single-field lines, very large files) - Spill file lifecycle (created on overflow, deleted after drain) - Serialization edge cases (tabs, unicode, empty strings, long data, newline limitation, escape sequence ambiguity) - Version cleanup (stale dirs deleted, current kept, siblings safe) - Scope creation (identity, isolation, special chars, empty name) - Custom config (root override, basePath) - NoOp (emit discards, drain returns empty success) Fix: replace List.removeFirst() with removeAt(0) for JDK < 21 compat --- .../telemetry/StreamTelemetryScopeImpl.kt | 4 +- .../telemetry/StreamTelemetryImplTest.kt | 295 ++++++ .../telemetry/StreamTelemetryScopeImplTest.kt | 970 ++++++++++++++++++ 3 files changed, 1267 insertions(+), 2 deletions(-) create mode 100644 stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt create mode 100644 stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt index 9187c1d..ab745fc 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt @@ -75,7 +75,7 @@ internal class StreamTelemetryScopeImpl( buffer = mutableListOf() scope.launch(Dispatchers.IO) { spillToDisk(snapshot) } } else { - buffer.removeFirst() + buffer.removeAt(0) } } } @@ -119,7 +119,7 @@ internal class StreamTelemetryScopeImpl( } val lines = file.readLines().toMutableList() while (lines.isNotEmpty() && file.length() > diskCapacity) { - lines.removeFirst() + lines.removeAt(0) file.writeText(lines.joinToString(separator = "\n", postfix = "\n")) } } diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt new file mode 100644 index 0000000..b08a20f --- /dev/null +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt @@ -0,0 +1,295 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.internal.telemetry + +import android.content.Context +import io.getstream.android.core.api.model.config.StreamTelemetryConfig +import io.getstream.android.core.api.telemetry.StreamTelemetryNoOp +import io.mockk.every +import io.mockk.mockk +import java.io.File +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.runBlocking +import org.junit.After +import org.junit.Assert.assertEquals +import org.junit.Assert.assertFalse +import org.junit.Assert.assertSame +import org.junit.Assert.assertTrue +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder + +class StreamTelemetryImplTest { + + @get:Rule val tempDir = TemporaryFolder() + + private lateinit var cacheDir: File + private lateinit var context: Context + private lateinit var scope: CoroutineScope + + @Before + fun setUp() { + cacheDir = tempDir.newFolder("cache") + context = mockk { + every { applicationContext } returns this + every { this@mockk.cacheDir } returns this@StreamTelemetryImplTest.cacheDir + } + scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) + } + + @After + fun tearDown() { + scope.cancel() + } + + private fun config( + version: String = "1.0.0", + root: File? = null, + basePath: String = "stream/telemetry", + ): StreamTelemetryConfig = + StreamTelemetryConfig(root = root, basePath = basePath, version = version) + + // ======================================== + // Scope creation + // ======================================== + + @Test + fun `scope creates a new scope with the given name`() { + val sut = StreamTelemetryImpl(context, config(), scope) + val telemetryScope = sut.scope("connection") + assertEquals("connection", telemetryScope.name) + } + + @Test + fun `scope returns same instance for same name`() { + val sut = StreamTelemetryImpl(context, config(), scope) + val first = sut.scope("connection") + val second = sut.scope("connection") + assertSame(first, second) + } + + @Test + fun `scope returns different instances for different names`() { + val sut = StreamTelemetryImpl(context, config(), scope) + val connection = sut.scope("connection") + val network = sut.scope("network") + assertTrue(connection !== network) + assertEquals("connection", connection.name) + assertEquals("network", network.name) + } + + @Test + fun `many scopes can coexist`() { + val sut = StreamTelemetryImpl(context, config(), scope) + val names = (1..100).map { "scope-$it" } + val scopes = names.map { sut.scope(it) } + + assertEquals(100, scopes.toSet().size) + scopes.forEachIndexed { idx, s -> assertEquals(names[idx], s.name) } + } + + @Test + fun `scope with empty name works`() { + val sut = StreamTelemetryImpl(context, config(), scope) + val telemetryScope = sut.scope("") + assertEquals("", telemetryScope.name) + } + + @Test + fun `scope with special characters in name works`() { + val sut = StreamTelemetryImpl(context, config(), scope) + val telemetryScope = sut.scope("sfu/publisher.video:h264") + assertEquals("sfu/publisher.video:h264", telemetryScope.name) + } + + // ======================================== + // Version cleanup + // ======================================== + + @Test + fun `init deletes stale version directories`() = runBlocking { + val baseDir = File(cacheDir, "stream/telemetry") + // Create old version dirs + File(baseDir, "0.9.0/connection").mkdirs() + File(baseDir, "0.9.0/connection/spill.bin").writeText("old-data") + File(baseDir, "0.8.0/network").mkdirs() + File(baseDir, "0.8.0/network/spill.bin").writeText("older-data") + + StreamTelemetryImpl(context, config(version = "1.0.0"), scope) + Thread.sleep(500) // wait for IO cleanup + + assertFalse("0.9.0 should be deleted", File(baseDir, "0.9.0").exists()) + assertFalse("0.8.0 should be deleted", File(baseDir, "0.8.0").exists()) + } + + @Test + fun `init keeps current version directory`() = runBlocking { + val baseDir = File(cacheDir, "stream/telemetry") + File(baseDir, "1.0.0/connection").mkdirs() + File(baseDir, "1.0.0/connection/spill.bin").writeText("current-data") + + StreamTelemetryImpl(context, config(version = "1.0.0"), scope) + Thread.sleep(500) + + assertTrue("1.0.0 should be kept", File(baseDir, "1.0.0").exists()) + assertTrue("spill.bin should be kept", File(baseDir, "1.0.0/connection/spill.bin").exists()) + } + + @Test + fun `init with no existing directories does not crash`() = runBlocking { + // cacheDir exists but no telemetry dirs + StreamTelemetryImpl(context, config(version = "1.0.0"), scope) + Thread.sleep(500) + // No assertion — just verifying no exception + } + + @Test + fun `init only deletes directories under basePath, not siblings`() = runBlocking { + // Create a file outside telemetry path + val unrelated = File(cacheDir, "stream/other-data.txt") + unrelated.parentFile?.mkdirs() + unrelated.writeText("important") + + StreamTelemetryImpl(context, config(version = "1.0.0"), scope) + Thread.sleep(500) + + assertTrue("Unrelated file should survive", unrelated.exists()) + } + + @Test + fun `cleanup only targets version directories, not files`() = runBlocking { + val baseDir = File(cacheDir, "stream/telemetry") + baseDir.mkdirs() + // Create a file (not directory) in the base path + File(baseDir, "some-file.txt").writeText("data") + // Create an old version dir + File(baseDir, "0.9.0").mkdirs() + + StreamTelemetryImpl(context, config(version = "1.0.0"), scope) + Thread.sleep(500) + + assertTrue("File should survive (not a directory)", File(baseDir, "some-file.txt").exists()) + assertFalse("Old version dir should be deleted", File(baseDir, "0.9.0").exists()) + } + + // ======================================== + // Custom root and basePath + // ======================================== + + @Test + fun `custom root overrides cacheDir`() = runBlocking { + val customRoot = tempDir.newFolder("custom-root") + val sut = StreamTelemetryImpl(context, config(version = "2.0.0", root = customRoot), scope) + + val telemetryScope = sut.scope("test") + telemetryScope.emit("event", null) + + // Spill dir should be under custom root + val expectedDir = File(customRoot, "stream/telemetry/2.0.0/test") + // Even if no spill happened, the scope should use this path + assertEquals("test", telemetryScope.name) + } + + @Test + fun `custom basePath is used`() = runBlocking { + val baseDir = File(cacheDir, "custom/path") + + // Create old version to verify cleanup uses custom path + File(baseDir, "0.9.0").mkdirs() + + StreamTelemetryImpl(context, config(version = "1.0.0", basePath = "custom/path"), scope) + Thread.sleep(500) + + assertFalse( + "Old version under custom path should be deleted", + File(baseDir, "0.9.0").exists(), + ) + } + + // ======================================== + // Integration: scope + emit + drain + // ======================================== + + @Test + fun `end-to-end emit and drain through telemetry`() = runBlocking { + val sut = StreamTelemetryImpl(context, config(), scope) + + sut.scope("connection").emit("connected", "user-1") + sut.scope("network").emit("wifi", "5GHz") + + val connectionSignals = sut.scope("connection").drain().getOrThrow() + val networkSignals = sut.scope("network").drain().getOrThrow() + + assertEquals(1, connectionSignals.size) + assertEquals("connected", connectionSignals[0].tag) + + assertEquals(1, networkSignals.size) + assertEquals("wifi", networkSignals[0].tag) + } + + @Test + fun `scopes are isolated - draining one does not affect another`() = runBlocking { + val sut = StreamTelemetryImpl(context, config(), scope) + + sut.scope("a").emit("event-a", null) + sut.scope("b").emit("event-b", null) + + sut.scope("a").drain() + + val bSignals = sut.scope("b").drain().getOrThrow() + assertEquals(1, bSignals.size) + assertEquals("event-b", bSignals[0].tag) + } + + // ======================================== + // NoOp + // ======================================== + + @Test + fun `NoOp scope name is noop`() { + val telemetryScope = StreamTelemetryNoOp.scope("anything") + assertEquals("noop", telemetryScope.name) + } + + @Test + fun `NoOp emit does nothing`() = runBlocking { + val telemetryScope = StreamTelemetryNoOp.scope("test") + telemetryScope.emit("event", "data") + + val result = telemetryScope.drain() + assertTrue(result.isSuccess) + assertTrue(result.getOrThrow().isEmpty()) + } + + @Test + fun `NoOp returns same scope for any name`() { + val a = StreamTelemetryNoOp.scope("a") + val b = StreamTelemetryNoOp.scope("b") + assertSame(a, b) + } + + @Test + fun `NoOp drain returns success empty list`() = runBlocking { + val result = StreamTelemetryNoOp.scope("x").drain() + assertTrue(result.isSuccess) + assertEquals(emptyList(), result.getOrThrow()) + } +} diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt new file mode 100644 index 0000000..02a49df --- /dev/null +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt @@ -0,0 +1,970 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.internal.telemetry + +import io.getstream.android.core.api.model.telemetry.StreamSignal +import io.getstream.android.core.api.telemetry.StreamSignalRedactor +import java.io.File +import java.util.concurrent.CountDownLatch +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.junit.After +import org.junit.Assert.assertEquals +import org.junit.Assert.assertFalse +import org.junit.Assert.assertNotNull +import org.junit.Assert.assertTrue +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder + +@Suppress("LargeClass") +class StreamTelemetryScopeImplTest { + + @get:Rule val tempDir = TemporaryFolder() + + private lateinit var spillDir: File + private lateinit var scope: CoroutineScope + + @Before + fun setUp() { + spillDir = tempDir.newFolder("spill") + scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) + } + + @After + fun tearDown() { + scope.cancel() + } + + private fun createScope( + name: String = "test", + memoryCapacity: Int = 500, + diskCapacity: Long = 1_000_000L, + redactor: StreamSignalRedactor? = null, + dir: File = spillDir, + cs: CoroutineScope = scope, + ): StreamTelemetryScopeImpl = + StreamTelemetryScopeImpl( + name = name, + memoryCapacity = memoryCapacity, + diskCapacity = diskCapacity, + spillDir = dir, + redactor = redactor, + scope = cs, + ) + + private fun waitForSpill() = Thread.sleep(500) + + // ======================================== + // Basic emit + drain + // ======================================== + + @Test + fun `emit single signal and drain returns it`() = runBlocking { + val sut = createScope() + sut.emit("connected", "user-123") + + val result = sut.drain() + assertTrue(result.isSuccess) + val signals = result.getOrThrow() + assertEquals(1, signals.size) + assertEquals("connected", signals[0].tag) + assertEquals("user-123", signals[0].data) + } + + @Test + fun `emit multiple signals preserves order`() = runBlocking { + val sut = createScope() + sut.emit("first", null) + sut.emit("second", null) + sut.emit("third", null) + + val signals = sut.drain().getOrThrow() + assertEquals(3, signals.size) + assertEquals("first", signals[0].tag) + assertEquals("second", signals[1].tag) + assertEquals("third", signals[2].tag) + } + + @Test + fun `emit with null data`() = runBlocking { + val sut = createScope() + sut.emit("event", null) + + val signals = sut.drain().getOrThrow() + assertEquals(1, signals.size) + assertEquals(null, signals[0].data) + } + + @Test + fun `emit with complex data`() = runBlocking { + val sut = createScope() + val data = mapOf("key" to "value", "count" to 42) + sut.emit("event", data) + + val signals = sut.drain().getOrThrow() + assertEquals(1, signals.size) + assertEquals(data, signals[0].data) + } + + @Test + fun `signal has non-zero timestamp`() = runBlocking { + val before = System.currentTimeMillis() + val sut = createScope() + sut.emit("event", null) + val after = System.currentTimeMillis() + + val signal = sut.drain().getOrThrow().single() + assertTrue(signal.timestamp >= before) + assertTrue(signal.timestamp <= after) + } + + // ======================================== + // Drain semantics + // ======================================== + + @Test + fun `drain on empty scope returns success with empty list`() = runBlocking { + val sut = createScope() + + val result = sut.drain() + assertTrue(result.isSuccess) + assertTrue(result.getOrThrow().isEmpty()) + } + + @Test + fun `drain clears the buffer`() = runBlocking { + val sut = createScope() + sut.emit("first", null) + sut.drain() + + val result = sut.drain() + assertTrue(result.isSuccess) + assertTrue(result.getOrThrow().isEmpty()) + } + + @Test + fun `drain twice without emit returns empty second time`() = runBlocking { + val sut = createScope() + sut.emit("event", null) + + val first = sut.drain().getOrThrow() + assertEquals(1, first.size) + + val second = sut.drain().getOrThrow() + assertTrue(second.isEmpty()) + } + + @Test + fun `emit after drain goes into new buffer`() = runBlocking { + val sut = createScope() + sut.emit("before-drain", null) + sut.drain() + + sut.emit("after-drain", null) + val signals = sut.drain().getOrThrow() + assertEquals(1, signals.size) + assertEquals("after-drain", signals[0].tag) + } + + // ======================================== + // Scope name + // ======================================== + + @Test + fun `name is set correctly`() { + val sut = createScope(name = "connection") + assertEquals("connection", sut.name) + } + + // ======================================== + // Redactor + // ======================================== + + @Test + fun `redactor transforms signal data`() = runBlocking { + val redactor = StreamSignalRedactor { signal -> + if (signal.tag == "auth.token") { + signal.copy(data = "[REDACTED]") + } else { + signal + } + } + val sut = createScope(redactor = redactor) + + sut.emit("auth.token", "secret-jwt-token") + sut.emit("connected", "user-123") + + val signals = sut.drain().getOrThrow() + assertEquals(2, signals.size) + assertEquals("[REDACTED]", signals[0].data) + assertEquals("user-123", signals[1].data) + } + + @Test + fun `redactor returning null falls back to raw signal`() = runBlocking { + // val signal = redactor?.redact(raw) ?: raw — null falls back to raw + val redactor = StreamSignalRedactor { null } + val sut = createScope(redactor = redactor) + + sut.emit("event", "data") + + val signals = sut.drain().getOrThrow() + assertEquals(1, signals.size) + assertEquals("event", signals[0].tag) + } + + @Test + fun `redactor can modify tag`() = runBlocking { + val redactor = StreamSignalRedactor { signal -> signal.copy(tag = "prefix.${signal.tag}") } + val sut = createScope(redactor = redactor) + sut.emit("event", null) + + val signals = sut.drain().getOrThrow() + assertEquals("prefix.event", signals[0].tag) + } + + @Test + fun `redactor exception does not crash emit`() { + val redactor = StreamSignalRedactor { throw RuntimeException("redactor boom") } + val sut = createScope(redactor = redactor) + + sut.emit("event", null) + // No exception thrown + } + + @Test + fun `no redactor leaves signal unchanged`() = runBlocking { + val sut = createScope(redactor = null) + sut.emit("event", "data") + + val signal = sut.drain().getOrThrow().single() + assertEquals("event", signal.tag) + assertEquals("data", signal.data) + } + + // ======================================== + // Memory capacity + spill + // ======================================== + + @Test + fun `signals within memory capacity stay in memory only`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 10, dir = dir) + repeat(10) { sut.emit("event-$it", null) } + + waitForSpill() + val spillFile = File(dir, "spill.bin") + assertTrue(!spillFile.exists() || spillFile.length() == 0L) + + val signals = sut.drain().getOrThrow() + assertEquals(10, signals.size) + } + + @Test + fun `exceeding memory capacity triggers spill`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 5, dir = dir) + repeat(6) { sut.emit("event-$it", null) } + + waitForSpill() + + val signals = sut.drain().getOrThrow() + assertTrue(signals.isNotEmpty()) + } + + @Test + fun `spill preserves FIFO order - disk signals come before memory`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 3, dir = dir) + + sut.emit("a", null) + sut.emit("b", null) + sut.emit("c", null) + sut.emit("d", null) + + waitForSpill() + + val signals = sut.drain().getOrThrow() + val tags = signals.map { it.tag } + assertTrue( + "Expected disk signals before memory, got: $tags", + tags.indexOf("a") < tags.indexOf("d"), + ) + } + + @Test + fun `multiple spills accumulate on disk`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + + repeat(3) { sut.emit("s$it", null) } + waitForSpill() + repeat(2) { sut.emit("s${it + 3}", null) } + waitForSpill() + + val signals = sut.drain().getOrThrow() + assertTrue("Expected at least 4 signals, got ${signals.size}", signals.size >= 4) + } + + // ======================================== + // Disk capacity + rotation + // ======================================== + + @Test + fun `disk rotation drops oldest when capacity exceeded`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, diskCapacity = 100, dir = dir) + + repeat(20) { sut.emit("event-$it", "some-payload-data") } + waitForSpill() + + val spillFile = File(dir, "spill.bin") + if (spillFile.exists()) { + assertTrue( + "Spill file should be around disk capacity, was ${spillFile.length()}", + spillFile.length() <= 150, + ) + } + } + + // ======================================== + // Drop oldest under pressure + // ======================================== + + @Test + fun `when spill in progress, oldest signal is dropped from buffer`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + + repeat(100) { sut.emit("event-$it", null) } + waitForSpill() + + val signals = sut.drain().getOrThrow() + assertTrue("Expected some signals, got ${signals.size}", signals.isNotEmpty()) + assertTrue("Expected some drops, got ${signals.size}", signals.size <= 100) + } + + // ======================================== + // Thread safety + // ======================================== + + @Test + fun `concurrent emit from multiple threads does not crash`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 50, dir = dir) + val threads = 10 + val emitsPerThread = 100 + val barrier = CyclicBarrier(threads) + val latch = CountDownLatch(threads) + + repeat(threads) { threadIdx -> + Thread { + barrier.await() + repeat(emitsPerThread) { i -> sut.emit("thread-$threadIdx-event-$i", null) } + latch.countDown() + } + .start() + } + + assertTrue(latch.await(10, TimeUnit.SECONDS)) + waitForSpill() + + val signals = sut.drain().getOrThrow() + assertTrue( + "Expected signals from concurrent emit, got ${signals.size}", + signals.isNotEmpty(), + ) + } + + @Test + fun `concurrent emit and drain does not crash`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 10, dir = dir) + val emitting = AtomicInteger(1) + val drainResults = mutableListOf>>() + + val emitJob = + scope.launch { + var i = 0 + while (emitting.get() == 1) { + sut.emit("event-${i++}", null) + delay(1) + } + } + + repeat(20) { + Thread.sleep(10) + drainResults.add(sut.drain()) + } + + emitting.set(0) + emitJob.cancel() + + drainResults.forEach { result -> + assertTrue("drain() should not fail during concurrent access", result.isSuccess) + } + } + + // ======================================== + // Disk I/O failures + // ======================================== + + @Test + fun `drain handles corrupt spill file gracefully`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(dir = dir) + + dir.mkdirs() + File(dir, "spill.bin").writeText("corrupt\u0000garbage\u0000not\u0000valid") + + sut.emit("memory-signal", null) + val result = sut.drain() + assertTrue(result.isSuccess) + } + + @Test + fun `emit does not throw when spill directory is read-only`() = runBlocking { + val readOnlyDir = tempDir.newFolder("readonly") + readOnlyDir.setReadOnly() + + val sut = createScope(memoryCapacity = 2, dir = File(readOnlyDir, "nested")) + + repeat(5) { sut.emit("event-$it", null) } + waitForSpill() + + val result = sut.drain() + assertTrue("drain should succeed even after spill failure", result.isSuccess) + } + + @Test + fun `drain with non-existent spill directory returns memory signals only`() = runBlocking { + val nonExistent = File(tempDir.root, "does-not-exist") + val sut = createScope(dir = nonExistent) + + sut.emit("event", "data") + val signals = sut.drain().getOrThrow() + assertEquals(1, signals.size) + } + + @Test + fun `drain with empty spill file returns memory signals only`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(dir = dir) + + dir.mkdirs() + File(dir, "spill.bin").createNewFile() + + sut.emit("event", "data") + val signals = sut.drain().getOrThrow() + assertEquals(1, signals.size) + assertEquals("event", signals[0].tag) + } + + // ======================================== + // Serialization edge cases + // ======================================== + + @Test + fun `signal with tab in tag survives spill roundtrip`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + + sut.emit("tag\twith\ttabs", "data") + sut.emit("trigger-spill", null) + sut.emit("trigger-spill-2", null) + waitForSpill() + + val signals = sut.drain().getOrThrow() + val tabSignal = signals.find { it.tag == "tag\twith\ttabs" } + assertNotNull("Signal with tabs in tag should survive roundtrip", tabSignal) + } + + @Test + fun `signal with tab in data survives spill roundtrip`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + + sut.emit("event", "data\twith\ttabs") + sut.emit("trigger-spill", null) + sut.emit("trigger-spill-2", null) + waitForSpill() + + val signals = sut.drain().getOrThrow() + val found = signals.find { it.tag == "event" } + assertNotNull("Signal should survive", found) + assertEquals("data\twith\ttabs", found?.data) + } + + @Test + fun `signal with newline in data - known limitation`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + + sut.emit("event", "line1\nline2") + sut.emit("trigger-spill", null) + sut.emit("trigger-spill-2", null) + waitForSpill() + + val signals = sut.drain().getOrThrow() + assertTrue(signals.isNotEmpty()) + } + + @Test + fun `signal with empty string data`() = runBlocking { + val sut = createScope() + sut.emit("event", "") + + val signal = sut.drain().getOrThrow().single() + assertEquals("event", signal.tag) + assertEquals("", signal.data) + } + + @Test + fun `signal with very long data`() = runBlocking { + val sut = createScope() + val longData = "x".repeat(10_000) + sut.emit("event", longData) + + val signal = sut.drain().getOrThrow().single() + assertEquals(longData, signal.data) + } + + @Test + fun `signal with unicode data`() = runBlocking { + val sut = createScope() + sut.emit("event", "données télémétrie 日本語") + + val signal = sut.drain().getOrThrow().single() + assertEquals("données télémétrie 日本語", signal.data) + } + + @Test + fun `signal with empty tag`() = runBlocking { + val sut = createScope() + sut.emit("", "data") + + val signal = sut.drain().getOrThrow().single() + assertEquals("", signal.tag) + } + + // ======================================== + // Memory capacity boundary values + // ======================================== + + @Test + fun `exactly at memory capacity does not spill`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 5, dir = dir) + repeat(5) { sut.emit("event-$it", null) } + + waitForSpill() + val spillFile = File(dir, "spill.bin") + assertTrue( + "Should not spill at exactly capacity", + !spillFile.exists() || spillFile.length() == 0L, + ) + + val signals = sut.drain().getOrThrow() + assertEquals(5, signals.size) + } + + @Test + fun `one over memory capacity triggers spill`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 5, dir = dir) + repeat(6) { sut.emit("event-$it", null) } + + waitForSpill() + val signals = sut.drain().getOrThrow() + assertTrue(signals.isNotEmpty()) + } + + @Test + fun `memory capacity of 1 spills on every second emit`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 1, dir = dir) + + sut.emit("a", null) + sut.emit("b", null) + waitForSpill() + + val signals = sut.drain().getOrThrow() + assertEquals(2, signals.size) + } + + // ======================================== + // Interleaved emit/drain cycles + // ======================================== + + @Test + fun `multiple emit-drain cycles work correctly`() = runBlocking { + val sut = createScope() + + repeat(5) { cycle -> + sut.emit("cycle-$cycle", null) + val signals = sut.drain().getOrThrow() + assertEquals(1, signals.size) + assertEquals("cycle-$cycle", signals[0].tag) + } + } + + @Test + fun `drain between spill cycles returns accumulated signals`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + + repeat(3) { sut.emit("batch1-$it", null) } + waitForSpill() + val first = sut.drain().getOrThrow() + assertTrue(first.isNotEmpty()) + + repeat(3) { sut.emit("batch2-$it", null) } + waitForSpill() + val second = sut.drain().getOrThrow() + assertTrue(second.isNotEmpty()) + + val third = sut.drain().getOrThrow() + assertTrue(third.isEmpty()) + } + + // ======================================== + // Emit never throws + // ======================================== + + @Test + fun `emit never throws regardless of internal failure`() { + val sut = + StreamTelemetryScopeImpl( + name = "test", + memoryCapacity = 0, + diskCapacity = 0, + spillDir = File("/nonexistent/path/that/cannot/exist"), + redactor = null, + scope = scope, + ) + + repeat(10) { sut.emit("event-$it", null) } + } + + // ======================================== + // Boundary value: disk capacity + // ======================================== + + @Test + fun `disk capacity of 0 means spilled signals are trimmed away`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, diskCapacity = 0, dir = dir) + + repeat(5) { sut.emit("event-$it", null) } + waitForSpill() + + val signals = sut.drain().getOrThrow() + // Spilled signals are trimmed to 0 bytes, but memory buffer survives. + // With rapid emits and spill resets, count varies — key assertion is that + // we get fewer than total emitted (some were lost to disk trim). + assertTrue("Expected fewer than 5 due to disk trim, got ${signals.size}", signals.size < 5) + } + + @Test + fun `disk capacity of 1 byte trims aggressively`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, diskCapacity = 1, dir = dir) + + repeat(5) { sut.emit("event-$it", null) } + waitForSpill() + + val spillFile = File(dir, "spill.bin") + if (spillFile.exists()) { + assertTrue( + "Spill file should be trimmed, was ${spillFile.length()} bytes", + spillFile.length() <= 50, + ) + } + } + + @Test + fun `disk capacity exactly matches one signal line`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, diskCapacity = 50, dir = dir) + + repeat(10) { sut.emit("evt", "d") } + waitForSpill() + + val spillFile = File(dir, "spill.bin") + if (spillFile.exists()) { + assertTrue("Spill file should stay around capacity", spillFile.length() <= 100) + } + } + + // ======================================== + // Boundary value: memory capacity + // ======================================== + + @Test + fun `memory capacity of 0 spills every signal`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 0, dir = dir) + + sut.emit("event", null) + waitForSpill() + + val spillFile = File(dir, "spill.bin") + val signals = sut.drain().getOrThrow() + assertTrue(signals.isNotEmpty() || spillFile.exists()) + } + + @Test + fun `large memory capacity holds everything in memory`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 100_000, dir = dir) + + repeat(10_000) { sut.emit("event-$it", null) } + + val spillFile = File(dir, "spill.bin") + assertTrue( + "No spill expected with large capacity", + !spillFile.exists() || spillFile.length() == 0L, + ) + + val signals = sut.drain().getOrThrow() + assertEquals(10_000, signals.size) + } + + // ======================================== + // Disk full / permission failures + // ======================================== + + @Test + fun `emit survives when spill dir is read-only`() = runBlocking { + val readOnlySpill = tempDir.newFolder("readonly-spill") + readOnlySpill.setReadOnly() + + val sut = + StreamTelemetryScopeImpl( + name = "test", + memoryCapacity = 2, + diskCapacity = 1_000_000L, + spillDir = File(readOnlySpill, "nested"), + redactor = null, + scope = scope, + ) + + repeat(10) { sut.emit("event-$it", null) } + waitForSpill() + + val result = sut.drain() + assertTrue("drain should succeed even after spill failure", result.isSuccess) + } + + @Test + fun `drain succeeds when spill file was deleted externally`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + + repeat(3) { sut.emit("event-$it", null) } + waitForSpill() + + File(dir, "spill.bin").delete() + + val result = sut.drain() + assertTrue(result.isSuccess) + } + + @Test + fun `drain succeeds when spill directory was deleted externally`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + + repeat(3) { sut.emit("event-$it", null) } + waitForSpill() + + dir.deleteRecursively() + + sut.emit("after-delete", null) + val result = sut.drain() + assertTrue(result.isSuccess) + val signals = result.getOrThrow() + assertTrue(signals.any { it.tag == "after-delete" }) + } + + // ======================================== + // Corrupt spill file + // ======================================== + + @Test + fun `drain handles spill file with only blank lines`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(dir = dir) + dir.mkdirs() + File(dir, "spill.bin").writeText("\n\n\n\n") + + sut.emit("memory-signal", null) + val signals = sut.drain().getOrThrow() + assertEquals(1, signals.size) + assertEquals("memory-signal", signals[0].tag) + } + + @Test + fun `drain handles spill file with mixed valid and invalid lines`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(dir = dir) + dir.mkdirs() + val content = + listOf( + "${System.currentTimeMillis()}\tvalid-signal\tdata", + "not-a-timestamp\tbad", + "", + "${System.currentTimeMillis()}\tanother-valid\t", + ) + .joinToString("\n") + File(dir, "spill.bin").writeText(content + "\n") + + val signals = sut.drain().getOrThrow() + val diskSignals = signals.filter { it.tag == "valid-signal" || it.tag == "another-valid" } + assertEquals(2, diskSignals.size) + } + + @Test + fun `drain handles spill file with only one field per line`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(dir = dir) + dir.mkdirs() + File(dir, "spill.bin").writeText("justOneField\nandAnother\n") + + sut.emit("memory", null) + val signals = sut.drain().getOrThrow() + assertEquals(1, signals.size) + assertEquals("memory", signals[0].tag) + } + + @Test + fun `drain handles binary garbage in spill file`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(dir = dir) + dir.mkdirs() + File(dir, "spill.bin") + .writeBytes(byteArrayOf(0, 1, 2, 3, 0xFF.toByte(), 0xFE.toByte(), 10, 0, 0, 10)) + + sut.emit("memory", null) + val result = sut.drain() + assertTrue(result.isSuccess) + } + + @Test + fun `drain handles very large spill file`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(dir = dir, diskCapacity = 10_000_000) + dir.mkdirs() + + val lines = + (1..10_000).joinToString("\n") { i -> + "${System.currentTimeMillis()}\tevent-$i\tpayload-$i" + } + File(dir, "spill.bin").writeText(lines + "\n") + + val signals = sut.drain().getOrThrow() + assertEquals(10_000, signals.size) + + assertFalse(File(dir, "spill.bin").exists()) + } + + // ======================================== + // Spill file lifecycle + // ======================================== + + @Test + fun `spill file is deleted after successful drain`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + + repeat(3) { sut.emit("event-$it", null) } + waitForSpill() + + val spillFile = File(dir, "spill.bin") + assertTrue("Spill file should exist before drain", spillFile.exists()) + + sut.drain() + assertFalse("Spill file should be deleted after drain", spillFile.exists()) + } + + @Test + fun `spill file is not created when signals fit in memory`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 100, dir = dir) + + repeat(50) { sut.emit("event-$it", null) } + waitForSpill() + + assertFalse("No spill file expected", File(dir, "spill.bin").exists()) + } + + // ======================================== + // Encode/decode edge cases + // ======================================== + + @Test + fun `signal with literal backslash-t in tag - known limitation after spill roundtrip`() = + runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 1, dir = dir) + + // The literal string "\t" (backslash + t) collides with the delimiter escape sequence. + // After a spill roundtrip, decode converts "\\t" back to a real tab character. + // This documents the known limitation. + sut.emit("tag\\twith\\tescape", "data") + sut.emit("trigger", null) + waitForSpill() + + val signals = sut.drain().getOrThrow() + // After roundtrip, \\t becomes \t (real tab) — the escape is ambiguous + assertTrue(signals.any { it.tag == "tag\twith\tescape" }) + } + + @Test + fun `signal with only whitespace tag and data`() = runBlocking { + val sut = createScope() + sut.emit(" ", " ") + + val signal = sut.drain().getOrThrow().single() + assertEquals(" ", signal.tag) + assertEquals(" ", signal.data) + } + + @Test + fun `timestamps are monotonically non-decreasing`() = runBlocking { + val sut = createScope() + repeat(100) { sut.emit("event-$it", null) } + + val signals = sut.drain().getOrThrow() + for (i in 1 until signals.size) { + assertTrue( + "Timestamps should be monotonically non-decreasing", + signals[i].timestamp >= signals[i - 1].timestamp, + ) + } + } +} From d003b409dd693029d32a21c690a0b8f79d9d4c23 Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Fri, 24 Apr 2026 10:51:50 +0200 Subject: [PATCH 5/9] fix(telemetry): disk Mutex for race safety, Result for emit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add Mutex for disk I/O — serializes spillToDisk and drainDisk to prevent concurrent read/write on the same spill file - emit() returns Result via runCatching - drain() already returns Result> via runCatchingCancellable - spillToDisk uses runCatchingCancellable instead of try/catch - Every public method now returns Result<*> for API consistency --- .../core/api/telemetry/StreamTelemetry.kt | 2 +- .../api/telemetry/StreamTelemetryScope.kt | 4 +- .../telemetry/StreamTelemetryScopeImpl.kt | 60 +++++++++++-------- 3 files changed, 39 insertions(+), 27 deletions(-) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt index 7dbf38e..fcd22b5 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt @@ -98,7 +98,7 @@ public object StreamTelemetryNoOp : StreamTelemetry { private object NoOpScope : StreamTelemetryScope { override val name: String = "noop" - override fun emit(tag: String, data: Any?) = Unit + override fun emit(tag: String, data: Any?): Result = Result.success(Unit) override suspend fun drain(): Result> = Result.success(emptyList()) } diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt index b1a0f9b..22188dd 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt @@ -54,8 +54,10 @@ public interface StreamTelemetryScope { * * @param tag Identifies what happened. * @param data Optional payload. Pass `null` for tag-only signals. + * @return [Result.success] if the signal was buffered, or [Result.failure] if an error + * occurred. */ - public fun emit(tag: String, data: Any? = null) + public fun emit(tag: String, data: Any? = null): Result /** * Atomically consumes all buffered signals, including any that were spilled to disk. diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt index ab745fc..abf7dc5 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt @@ -25,6 +25,8 @@ import java.util.concurrent.atomic.AtomicBoolean import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext /** @@ -33,8 +35,13 @@ import kotlinx.coroutines.withContext * * ### Buffer swap * - * [emit] appends to a mutable list guarded by an internal lock. [drain] atomically swaps the list - * for a fresh one, so reads and writes never contend beyond the swap itself. + * [emit] appends to a mutable list guarded by [synchronized]. [drain] atomically swaps the list for + * a fresh one, so reads and writes never contend beyond the swap itself. + * + * ### Disk concurrency + * + * All disk I/O ([spillToDisk], [trimDiskIfNeeded], [drainDisk]) is serialized through a [Mutex]. + * This prevents races between a spill write and a drain read on the same file. * * ### Disk spill * @@ -57,14 +64,16 @@ internal class StreamTelemetryScopeImpl( ) : StreamTelemetryScope { private val lock = Any() + private val diskMutex = Mutex() private val spillFile: File get() = File(spillDir, SPILL_FILE_NAME) private var buffer = mutableListOf() private val spilling = AtomicBoolean(false) - override fun emit(tag: String, data: Any?) { - try { + override fun emit(tag: String, data: Any?): Result = + // runCatching (not cancellable) — emit is not a suspend function. + runCatching { val raw = StreamSignal(tag = tag, data = data, timestamp = System.currentTimeMillis()) val signal = redactor?.redact(raw) ?: raw synchronized(lock) { @@ -79,10 +88,7 @@ internal class StreamTelemetryScopeImpl( } } } - } catch (@Suppress("TooGenericExceptionCaught") ignored: Exception) { - // Telemetry must never affect the caller. } - } override suspend fun drain(): Result> = runCatchingCancellable { val memorySnapshot: List @@ -100,17 +106,20 @@ internal class StreamTelemetryScopeImpl( // --- Disk spill ---------------------------------------------------------------- - private fun spillToDisk(signals: List) { - try { - spillDir.mkdirs() - val file = spillFile - file.appendText(signals.joinToString(separator = "\n", postfix = "\n") { encode(it) }) - trimDiskIfNeeded(file) - } catch (@Suppress("TooGenericExceptionCaught") ignored: Exception) { - // Disk I/O failure — signals are lost. That's acceptable for telemetry. - } finally { - spilling.set(false) + private suspend fun spillToDisk(signals: List) { + runCatchingCancellable { + diskMutex.withLock { + spillDir.mkdirs() + val file = spillFile + file.appendText( + signals.joinToString(separator = "\n", postfix = "\n") { encode(it) } + ) + trimDiskIfNeeded(file) + } } + // Disk I/O failure — signals are lost. That's acceptable for telemetry. + // Always reset the spilling flag so future emits can trigger new spills. + spilling.set(false) } private fun trimDiskIfNeeded(file: File) { @@ -124,15 +133,16 @@ internal class StreamTelemetryScopeImpl( } } - private fun drainDisk(): List { - val file = spillFile - if (!file.exists() || file.length() == 0L) { - return emptyList() + private suspend fun drainDisk(): List = + diskMutex.withLock { + val file = spillFile + if (!file.exists() || file.length() == 0L) { + return@withLock emptyList() + } + val signals = file.readLines().mapNotNull { decode(it) } + file.delete() + signals } - val signals = file.readLines().mapNotNull { decode(it) } - file.delete() - return signals - } // --- Serialization (simple line-based format) ----------------------------------- From 92cccce6eb94ddc5b86feab54be81868c5248b46 Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Fri, 24 Apr 2026 10:56:33 +0200 Subject: [PATCH 6/9] test(telemetry): add concurrency tests for disk Mutex 5 new tests exercising the disk Mutex under concurrent access: - drain while spill is in flight - rapid spill cycles with memoryCapacity=1 - concurrent drain and emit with disk overflow + no-duplicate check - multiple coroutines draining same scope concurrently - spill and drain interleaved rapidly over 50 cycles --- .../telemetry/StreamTelemetryScopeImplTest.kt | 136 ++++++++++++++++++ 1 file changed, 136 insertions(+) diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt index 02a49df..83ed6a8 100644 --- a/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt @@ -429,6 +429,142 @@ class StreamTelemetryScopeImplTest { } } + // ======================================== + // Disk Mutex — concurrent spill + drain + // ======================================== + + @Test + fun `drain while spill is in flight returns consistent data`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 3, dir = dir) + + // Emit enough to trigger spill + repeat(4) { sut.emit("event-$it", null) } + + // Immediately drain — spill may or may not have finished + // With the disk Mutex, drain waits for spill to complete before reading + Thread.sleep(100) + val result = sut.drain() + + assertTrue("drain should succeed", result.isSuccess) + val signals = result.getOrThrow() + // All 4 signals should be accounted for (either from disk or memory) + assertEquals(4, signals.size) + } + + @Test + fun `rapid spill cycles with tiny memory capacity`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 1, dir = dir) + + // Rapid-fire 200 emits — each pair triggers a spill + repeat(200) { sut.emit("event-$it", null) } + waitForSpill() + + val result = sut.drain() + assertTrue("drain should succeed after many spill cycles", result.isSuccess) + val signals = result.getOrThrow() + assertTrue("Expected signals, got ${signals.size}", signals.isNotEmpty()) + + // All signals should have valid structure + signals.forEach { signal -> + assertTrue("Tag should start with event-", signal.tag.startsWith("event-")) + assertTrue("Timestamp should be positive", signal.timestamp > 0) + } + } + + @Test + fun `concurrent drain and emit with disk overflow`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, diskCapacity = 200, dir = dir) + val emitting = AtomicInteger(1) + val allDrainedSignals = mutableListOf() + val drainFailures = AtomicInteger(0) + + // Emit rapidly in background + val emitJob = + scope.launch { + var i = 0 + while (emitting.get() == 1) { + sut.emit("event-${i++}", "payload") + delay(1) + } + } + + // Drain 30 times concurrently with emit + repeat(30) { + Thread.sleep(10) + val result = sut.drain() + if (result.isSuccess) { + synchronized(allDrainedSignals) { allDrainedSignals.addAll(result.getOrThrow()) } + } else { + drainFailures.incrementAndGet() + } + } + + emitting.set(0) + emitJob.cancel() + waitForSpill() + + // Final drain to get remaining signals + sut.drain().onSuccess { remaining -> + synchronized(allDrainedSignals) { allDrainedSignals.addAll(remaining) } + } + + assertTrue("Should have collected some signals", allDrainedSignals.isNotEmpty()) + assertEquals("No drain should have failed", 0, drainFailures.get()) + + // No duplicates — each drain clears the buffer + val uniqueTimestampTagPairs = allDrainedSignals.map { "${it.timestamp}-${it.tag}" }.toSet() + assertEquals( + "No duplicate signals expected", + allDrainedSignals.size, + uniqueTimestampTagPairs.size, + ) + } + + @Test + fun `multiple coroutines draining same scope concurrently`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 5, dir = dir) + + // Pre-fill with signals that will spill + repeat(20) { sut.emit("event-$it", null) } + waitForSpill() + + // Launch 10 concurrent drains + val results = (1..10).map { scope.launch { sut.drain() } } + results.forEach { it.join() } + + // After all drains, scope should be empty + val remaining = sut.drain().getOrThrow() + assertTrue("Scope should be empty after concurrent drains", remaining.isEmpty()) + } + + @Test + fun `spill and drain interleaved rapidly`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + val allSignals = mutableListOf() + + // 50 cycles of: emit enough to spill, then drain + repeat(50) { cycle -> + repeat(3) { sut.emit("cycle-$cycle-event-$it", null) } + Thread.sleep(50) // let spill run + sut.drain().onSuccess { signals -> + synchronized(allSignals) { allSignals.addAll(signals) } + } + } + + // Wait for any remaining spills + waitForSpill() + sut.drain().onSuccess { signals -> synchronized(allSignals) { allSignals.addAll(signals) } } + + // We emitted 150 signals total (50 * 3) + // Some may be lost due to drop-oldest under pressure, but most should survive + assertTrue("Expected most of 150 signals, got ${allSignals.size}", allSignals.size >= 100) + } + // ======================================== // Disk I/O failures // ======================================== From 0442dc619dde58115ec6529253a8c088ff6f717b Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Fri, 24 Apr 2026 11:01:10 +0200 Subject: [PATCH 7/9] refactor(telemetry): move cleanStaleVersions to public API, remove init - cleanStaleVersions() is now a public suspend method on StreamTelemetry - Product SDK calls it when ready, not auto-fired in init - Removed hardcoded Dispatchers.IO from StreamTelemetryImpl - Uses runCatchingCancellable, returns Result - Updated tests to call cleanStaleVersions() explicitly --- .../core/api/telemetry/StreamTelemetry.kt | 12 ++++++ .../internal/telemetry/StreamTelemetryImpl.kt | 39 ++++++------------- .../telemetry/StreamTelemetryImplTest.kt | 29 +++++++------- 3 files changed, 39 insertions(+), 41 deletions(-) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt index fcd22b5..f36bbf8 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt @@ -66,6 +66,16 @@ public interface StreamTelemetry { * @return The scope for [name]. */ public fun scope(name: String): StreamTelemetryScope + + /** + * Deletes disk spill directories from previous SDK versions. + * + * Call this when the product SDK is ready (e.g. after initialization). Directories that don't + * match the current [StreamTelemetryConfig.version] are removed. + * + * @return [Result.success] on completion, or [Result.failure] if cleanup could not run. + */ + public suspend fun cleanStaleVersions(): Result } /** @@ -95,6 +105,8 @@ public object StreamTelemetryNoOp : StreamTelemetry { override fun scope(name: String): StreamTelemetryScope = NoOpScope + override suspend fun cleanStaleVersions(): Result = Result.success(Unit) + private object NoOpScope : StreamTelemetryScope { override val name: String = "noop" diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImpl.kt index 9bc6155..ab7c2ff 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImpl.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImpl.kt @@ -20,18 +20,17 @@ import android.content.Context import io.getstream.android.core.api.model.config.StreamTelemetryConfig import io.getstream.android.core.api.telemetry.StreamTelemetry import io.getstream.android.core.api.telemetry.StreamTelemetryScope +import io.getstream.android.core.api.utils.runCatchingCancellable import java.io.File import java.util.concurrent.ConcurrentHashMap import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.launch /** * Default [StreamTelemetry] implementation. * - * On creation, scans the telemetry base directory and deletes any version subdirectories that don't - * match [StreamTelemetryConfig.version]. This ensures stale spill files from older SDK versions are - * never deserialized with an incompatible format. + * Scans the telemetry base directory on [cleanStaleVersions] and deletes any version subdirectories + * that don't match [StreamTelemetryConfig.version]. This ensures stale spill files from older SDK + * versions are never deserialized with an incompatible format. */ internal class StreamTelemetryImpl( context: Context, @@ -44,10 +43,6 @@ internal class StreamTelemetryImpl( private val baseDir: File = File(config.root ?: context.cacheDir, "${config.basePath}/${config.version}") - init { - scope.launch(Dispatchers.IO) { cleanStaleVersions() } - } - override fun scope(name: String): StreamTelemetryScope = scopes.getOrPut(name) { StreamTelemetryScopeImpl( @@ -60,25 +55,15 @@ internal class StreamTelemetryImpl( ) } - /** - * Deletes any sibling directories under the telemetry base path that don't match the current - * version. For example, if the base path is `stream/telemetry` and the version is `1.2.0`, this - * deletes `stream/telemetry/1.1.0/` but keeps `stream/telemetry/1.2.0/`. - */ - @Suppress("ReturnCount") - private fun cleanStaleVersions() { - try { - val versionParent = baseDir.parentFile ?: return - if (!versionParent.exists()) { - return - } - versionParent.listFiles()?.forEach { dir -> - if (dir.isDirectory && dir.name != config.version) { - dir.deleteRecursively() - } + override suspend fun cleanStaleVersions(): Result = runCatchingCancellable { + val versionParent = baseDir.parentFile ?: return@runCatchingCancellable + if (!versionParent.exists()) { + return@runCatchingCancellable + } + versionParent.listFiles()?.forEach { dir -> + if (dir.isDirectory && dir.name != config.version) { + dir.deleteRecursively() } - } catch (@Suppress("TooGenericExceptionCaught") ignored: Exception) { - // Best-effort cleanup. If it fails, stale files remain in cache — OS can reclaim. } } } diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt index b08a20f..ce7f39e 100644 --- a/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt @@ -133,8 +133,8 @@ class StreamTelemetryImplTest { File(baseDir, "0.8.0/network").mkdirs() File(baseDir, "0.8.0/network/spill.bin").writeText("older-data") - StreamTelemetryImpl(context, config(version = "1.0.0"), scope) - Thread.sleep(500) // wait for IO cleanup + val sut = StreamTelemetryImpl(context, config(version = "1.0.0"), scope) + sut.cleanStaleVersions() assertFalse("0.9.0 should be deleted", File(baseDir, "0.9.0").exists()) assertFalse("0.8.0 should be deleted", File(baseDir, "0.8.0").exists()) @@ -146,19 +146,19 @@ class StreamTelemetryImplTest { File(baseDir, "1.0.0/connection").mkdirs() File(baseDir, "1.0.0/connection/spill.bin").writeText("current-data") - StreamTelemetryImpl(context, config(version = "1.0.0"), scope) - Thread.sleep(500) + val sut = StreamTelemetryImpl(context, config(version = "1.0.0"), scope) + sut.cleanStaleVersions() assertTrue("1.0.0 should be kept", File(baseDir, "1.0.0").exists()) assertTrue("spill.bin should be kept", File(baseDir, "1.0.0/connection/spill.bin").exists()) } @Test - fun `init with no existing directories does not crash`() = runBlocking { + fun `cleanStaleVersions with no existing directories does not crash`() = runBlocking { // cacheDir exists but no telemetry dirs - StreamTelemetryImpl(context, config(version = "1.0.0"), scope) - Thread.sleep(500) - // No assertion — just verifying no exception + val sut = StreamTelemetryImpl(context, config(version = "1.0.0"), scope) + val result = sut.cleanStaleVersions() + assertTrue(result.isSuccess) } @Test @@ -168,8 +168,8 @@ class StreamTelemetryImplTest { unrelated.parentFile?.mkdirs() unrelated.writeText("important") - StreamTelemetryImpl(context, config(version = "1.0.0"), scope) - Thread.sleep(500) + val sut = StreamTelemetryImpl(context, config(version = "1.0.0"), scope) + sut.cleanStaleVersions() assertTrue("Unrelated file should survive", unrelated.exists()) } @@ -183,8 +183,8 @@ class StreamTelemetryImplTest { // Create an old version dir File(baseDir, "0.9.0").mkdirs() - StreamTelemetryImpl(context, config(version = "1.0.0"), scope) - Thread.sleep(500) + val sut = StreamTelemetryImpl(context, config(version = "1.0.0"), scope) + sut.cleanStaleVersions() assertTrue("File should survive (not a directory)", File(baseDir, "some-file.txt").exists()) assertFalse("Old version dir should be deleted", File(baseDir, "0.9.0").exists()) @@ -215,8 +215,9 @@ class StreamTelemetryImplTest { // Create old version to verify cleanup uses custom path File(baseDir, "0.9.0").mkdirs() - StreamTelemetryImpl(context, config(version = "1.0.0", basePath = "custom/path"), scope) - Thread.sleep(500) + val sut = + StreamTelemetryImpl(context, config(version = "1.0.0", basePath = "custom/path"), scope) + sut.cleanStaleVersions() assertFalse( "Old version under custom path should be deleted", From 59973bd2afa6d2a33d545615c8929c20e60164f3 Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Mon, 27 Apr 2026 14:14:23 +0200 Subject: [PATCH 8/9] fix(telemetry): address PR review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Source fixes: - StreamSignal.data: Any? → String? — callers pre-serialize, no silent toString() coercion on disk spill (addresses Petar's review + bot #3) - Redactor null return now drops the signal instead of falling back to raw — honors the StreamSignalRedactor KDoc contract (bot #1) - trimDiskIfNeeded O(N²) → O(N) single-pass — compute suffix that fits within diskCapacity and write once (bot #2) - Updated KDocs on StreamSignal, StreamTelemetryScope.emit Test fixes: - FIFO assertion strengthened — assertEquals exact order instead of weak indexOf check (bot #6) - Redactor tests updated for null = drop behavior - Redactor exception test now asserts Result.isFailure + drain success (bot #5) - Custom root test verifies cleanup targets custom root, not cacheDir (bot #4) - setReadOnly tests replaced with file-as-parent barrier for deterministic mkdirs() failure regardless of uid (bot #7) --- .../core/api/model/telemetry/StreamSignal.kt | 5 +- .../core/api/telemetry/StreamTelemetry.kt | 2 +- .../api/telemetry/StreamTelemetryScope.kt | 10 +-- .../telemetry/StreamTelemetryScopeImpl.kt | 24 +++-- .../telemetry/StreamTelemetryImplTest.kt | 21 +++-- .../telemetry/StreamTelemetryScopeImplTest.kt | 88 ++++++++----------- 6 files changed, 75 insertions(+), 75 deletions(-) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/model/telemetry/StreamSignal.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/model/telemetry/StreamSignal.kt index 57b7d26..ca0b5a7 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/model/telemetry/StreamSignal.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/model/telemetry/StreamSignal.kt @@ -23,8 +23,9 @@ import io.getstream.android.core.annotations.StreamInternalApi * * @param tag Identifies what happened (e.g. `"connected"`, `"token.refreshed"`, * `"network.changed"`). - * @param data Arbitrary payload associated with the signal. May be `null` for tag-only signals. + * @param data Pre-serialized payload associated with the signal. Callers are responsible for + * serializing domain objects to a string before emitting. May be `null` for tag-only signals. * @param timestamp Epoch milliseconds when the signal was recorded. */ @StreamInternalApi -public data class StreamSignal(val tag: String, val data: Any?, val timestamp: Long) +public data class StreamSignal(val tag: String, val data: String?, val timestamp: Long) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt index f36bbf8..4850275 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt @@ -110,7 +110,7 @@ public object StreamTelemetryNoOp : StreamTelemetry { private object NoOpScope : StreamTelemetryScope { override val name: String = "noop" - override fun emit(tag: String, data: Any?): Result = Result.success(Unit) + override fun emit(tag: String, data: String?): Result = Result.success(Unit) override suspend fun drain(): Result> = Result.success(emptyList()) } diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt index 22188dd..439c271 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt @@ -50,14 +50,14 @@ public interface StreamTelemetryScope { * Records a signal into this scope. * * The [StreamSignalRedactor] (if configured) runs synchronously before the signal is buffered. - * Failures are silently consumed — this method never throws or affects the caller's flow. + * If the redactor returns `null`, the signal is dropped and not buffered. * * @param tag Identifies what happened. - * @param data Optional payload. Pass `null` for tag-only signals. - * @return [Result.success] if the signal was buffered, or [Result.failure] if an error - * occurred. + * @param data Optional pre-serialized payload. Pass `null` for tag-only signals. + * @return [Result.success] if the signal was buffered (or intentionally dropped by the + * redactor), or [Result.failure] if an error occurred. */ - public fun emit(tag: String, data: Any? = null): Result + public fun emit(tag: String, data: String? = null): Result /** * Atomically consumes all buffered signals, including any that were spilled to disk. diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt index abf7dc5..da6c651 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt @@ -47,7 +47,8 @@ import kotlinx.coroutines.withContext * * When the memory buffer exceeds the configured capacity, the oldest signals are serialized to disk * on [Dispatchers.IO]. If a spill is already in progress, the oldest in-memory signal is dropped - * instead. Disk storage is capped per scope; when exceeded the oldest lines are removed. + * instead. Disk storage is capped per scope; when exceeded the oldest lines are removed in a single + * pass. * * ### Drain order * @@ -71,11 +72,12 @@ internal class StreamTelemetryScopeImpl( private var buffer = mutableListOf() private val spilling = AtomicBoolean(false) - override fun emit(tag: String, data: Any?): Result = + override fun emit(tag: String, data: String?): Result = // runCatching (not cancellable) — emit is not a suspend function. runCatching { val raw = StreamSignal(tag = tag, data = data, timestamp = System.currentTimeMillis()) - val signal = redactor?.redact(raw) ?: raw + val signal = if (redactor != null) redactor.redact(raw) else raw + if (signal == null) return@runCatching // redactor dropped the signal synchronized(lock) { buffer.add(signal) if (buffer.size > memoryCapacity) { @@ -126,11 +128,17 @@ internal class StreamTelemetryScopeImpl( if (!file.exists() || file.length() <= diskCapacity) { return } - val lines = file.readLines().toMutableList() - while (lines.isNotEmpty() && file.length() > diskCapacity) { - lines.removeAt(0) - file.writeText(lines.joinToString(separator = "\n", postfix = "\n")) + val lines = file.readLines() + var bytes = 0L + var startIdx = lines.size + for (i in lines.indices.reversed()) { + val next = bytes + lines[i].toByteArray(Charsets.UTF_8).size + 1 + if (next > diskCapacity) break + bytes = next + startIdx = i } + val kept = lines.subList(startIdx, lines.size) + file.writeText(if (kept.isEmpty()) "" else kept.joinToString("\n", postfix = "\n")) } private suspend fun drainDisk(): List = @@ -148,7 +156,7 @@ internal class StreamTelemetryScopeImpl( private fun encode(signal: StreamSignal): String { val escapedTag = signal.tag.replace(DELIMITER, DELIMITER_ESCAPE) - val escapedData = signal.data?.toString()?.replace(DELIMITER, DELIMITER_ESCAPE).orEmpty() + val escapedData = signal.data?.replace(DELIMITER, DELIMITER_ESCAPE).orEmpty() return "${signal.timestamp}$DELIMITER$escapedTag$DELIMITER$escapedData" } diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt index ce7f39e..67e787f 100644 --- a/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt @@ -195,17 +195,22 @@ class StreamTelemetryImplTest { // ======================================== @Test - fun `custom root overrides cacheDir`() = runBlocking { + fun `custom root overrides cacheDir for cleanup`() = runBlocking { val customRoot = tempDir.newFolder("custom-root") - val sut = StreamTelemetryImpl(context, config(version = "2.0.0", root = customRoot), scope) + // Seed stale version under custom root and cacheDir + val staleUnderCustomRoot = File(customRoot, "stream/telemetry/0.9.0") + staleUnderCustomRoot.mkdirs() + val staleUnderCacheDir = File(cacheDir, "stream/telemetry/0.9.0") + staleUnderCacheDir.mkdirs() - val telemetryScope = sut.scope("test") - telemetryScope.emit("event", null) + val sut = StreamTelemetryImpl(context, config(version = "2.0.0", root = customRoot), scope) + sut.cleanStaleVersions() - // Spill dir should be under custom root - val expectedDir = File(customRoot, "stream/telemetry/2.0.0/test") - // Even if no spill happened, the scope should use this path - assertEquals("test", telemetryScope.name) + assertFalse("Stale under custom root should be deleted", staleUnderCustomRoot.exists()) + assertTrue( + "cacheDir should not be touched when custom root is set", + staleUnderCacheDir.exists(), + ) } @Test diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt index 83ed6a8..27722b2 100644 --- a/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt @@ -120,14 +120,13 @@ class StreamTelemetryScopeImplTest { } @Test - fun `emit with complex data`() = runBlocking { + fun `emit with string data`() = runBlocking { val sut = createScope() - val data = mapOf("key" to "value", "count" to 42) - sut.emit("event", data) + sut.emit("event", """{"key":"value","count":42}""") val signals = sut.drain().getOrThrow() assertEquals(1, signals.size) - assertEquals(data, signals[0].data) + assertEquals("""{"key":"value","count":42}""", signals[0].data) } @Test @@ -225,16 +224,29 @@ class StreamTelemetryScopeImplTest { } @Test - fun `redactor returning null falls back to raw signal`() = runBlocking { - // val signal = redactor?.redact(raw) ?: raw — null falls back to raw + fun `redactor returning null drops the signal`() = runBlocking { val redactor = StreamSignalRedactor { null } val sut = createScope(redactor = redactor) sut.emit("event", "data") + val signals = sut.drain().getOrThrow() + assertTrue("Signal should be dropped when redactor returns null", signals.isEmpty()) + } + + @Test + fun `redactor selectively drops signals`() = runBlocking { + val redactor = StreamSignalRedactor { signal -> + if (signal.tag == "debug") null else signal + } + val sut = createScope(redactor = redactor) + + sut.emit("debug", "verbose info") + sut.emit("connected", "user-123") + val signals = sut.drain().getOrThrow() assertEquals(1, signals.size) - assertEquals("event", signals[0].tag) + assertEquals("connected", signals[0].tag) } @Test @@ -248,12 +260,15 @@ class StreamTelemetryScopeImplTest { } @Test - fun `redactor exception does not crash emit`() { + fun `redactor exception does not crash emit and returns failure`() = runBlocking { val redactor = StreamSignalRedactor { throw RuntimeException("redactor boom") } val sut = createScope(redactor = redactor) - sut.emit("event", null) - // No exception thrown + val emitResult = sut.emit("event", null) + assertTrue("emit should capture redactor failure in Result", emitResult.isFailure) + + val drainResult = sut.drain() + assertTrue("drain should still succeed", drainResult.isSuccess) } @Test @@ -308,12 +323,8 @@ class StreamTelemetryScopeImplTest { waitForSpill() - val signals = sut.drain().getOrThrow() - val tags = signals.map { it.tag } - assertTrue( - "Expected disk signals before memory, got: $tags", - tags.indexOf("a") < tags.indexOf("d"), - ) + val tags = sut.drain().getOrThrow().map { it.tag } + assertEquals(listOf("a", "b", "c", "d"), tags) } @Test @@ -438,17 +449,13 @@ class StreamTelemetryScopeImplTest { val dir = tempDir.newFolder() val sut = createScope(memoryCapacity = 3, dir = dir) - // Emit enough to trigger spill repeat(4) { sut.emit("event-$it", null) } - // Immediately drain — spill may or may not have finished - // With the disk Mutex, drain waits for spill to complete before reading Thread.sleep(100) val result = sut.drain() assertTrue("drain should succeed", result.isSuccess) val signals = result.getOrThrow() - // All 4 signals should be accounted for (either from disk or memory) assertEquals(4, signals.size) } @@ -457,7 +464,6 @@ class StreamTelemetryScopeImplTest { val dir = tempDir.newFolder() val sut = createScope(memoryCapacity = 1, dir = dir) - // Rapid-fire 200 emits — each pair triggers a spill repeat(200) { sut.emit("event-$it", null) } waitForSpill() @@ -466,7 +472,6 @@ class StreamTelemetryScopeImplTest { val signals = result.getOrThrow() assertTrue("Expected signals, got ${signals.size}", signals.isNotEmpty()) - // All signals should have valid structure signals.forEach { signal -> assertTrue("Tag should start with event-", signal.tag.startsWith("event-")) assertTrue("Timestamp should be positive", signal.timestamp > 0) @@ -481,7 +486,6 @@ class StreamTelemetryScopeImplTest { val allDrainedSignals = mutableListOf() val drainFailures = AtomicInteger(0) - // Emit rapidly in background val emitJob = scope.launch { var i = 0 @@ -491,7 +495,6 @@ class StreamTelemetryScopeImplTest { } } - // Drain 30 times concurrently with emit repeat(30) { Thread.sleep(10) val result = sut.drain() @@ -506,7 +509,6 @@ class StreamTelemetryScopeImplTest { emitJob.cancel() waitForSpill() - // Final drain to get remaining signals sut.drain().onSuccess { remaining -> synchronized(allDrainedSignals) { allDrainedSignals.addAll(remaining) } } @@ -514,7 +516,6 @@ class StreamTelemetryScopeImplTest { assertTrue("Should have collected some signals", allDrainedSignals.isNotEmpty()) assertEquals("No drain should have failed", 0, drainFailures.get()) - // No duplicates — each drain clears the buffer val uniqueTimestampTagPairs = allDrainedSignals.map { "${it.timestamp}-${it.tag}" }.toSet() assertEquals( "No duplicate signals expected", @@ -528,15 +529,12 @@ class StreamTelemetryScopeImplTest { val dir = tempDir.newFolder() val sut = createScope(memoryCapacity = 5, dir = dir) - // Pre-fill with signals that will spill repeat(20) { sut.emit("event-$it", null) } waitForSpill() - // Launch 10 concurrent drains val results = (1..10).map { scope.launch { sut.drain() } } results.forEach { it.join() } - // After all drains, scope should be empty val remaining = sut.drain().getOrThrow() assertTrue("Scope should be empty after concurrent drains", remaining.isEmpty()) } @@ -547,21 +545,17 @@ class StreamTelemetryScopeImplTest { val sut = createScope(memoryCapacity = 2, dir = dir) val allSignals = mutableListOf() - // 50 cycles of: emit enough to spill, then drain repeat(50) { cycle -> repeat(3) { sut.emit("cycle-$cycle-event-$it", null) } - Thread.sleep(50) // let spill run + Thread.sleep(50) sut.drain().onSuccess { signals -> synchronized(allSignals) { allSignals.addAll(signals) } } } - // Wait for any remaining spills waitForSpill() sut.drain().onSuccess { signals -> synchronized(allSignals) { allSignals.addAll(signals) } } - // We emitted 150 signals total (50 * 3) - // Some may be lost due to drop-oldest under pressure, but most should survive assertTrue("Expected most of 150 signals, got ${allSignals.size}", allSignals.size >= 100) } @@ -583,11 +577,10 @@ class StreamTelemetryScopeImplTest { } @Test - fun `emit does not throw when spill directory is read-only`() = runBlocking { - val readOnlyDir = tempDir.newFolder("readonly") - readOnlyDir.setReadOnly() - - val sut = createScope(memoryCapacity = 2, dir = File(readOnlyDir, "nested")) + fun `emit succeeds when spill directory cannot be created`() = runBlocking { + // Use a file as parent so mkdirs() fails deterministically (not uid-dependent) + val blocker = tempDir.newFile("blocker-file") + val sut = createScope(memoryCapacity = 2, dir = File(blocker, "nested")) repeat(5) { sut.emit("event-$it", null) } waitForSpill() @@ -819,9 +812,6 @@ class StreamTelemetryScopeImplTest { waitForSpill() val signals = sut.drain().getOrThrow() - // Spilled signals are trimmed to 0 bytes, but memory buffer survives. - // With rapid emits and spill resets, count varies — key assertion is that - // we get fewer than total emitted (some were lost to disk trim). assertTrue("Expected fewer than 5 due to disk trim, got ${signals.size}", signals.size < 5) } @@ -891,20 +881,19 @@ class StreamTelemetryScopeImplTest { } // ======================================== - // Disk full / permission failures + // Disk write failure (deterministic) // ======================================== @Test - fun `emit survives when spill dir is read-only`() = runBlocking { - val readOnlySpill = tempDir.newFolder("readonly-spill") - readOnlySpill.setReadOnly() - + fun `emit survives when spill dir parent is a file`() = runBlocking { + // A file as parent makes mkdirs() fail deterministically regardless of uid + val blocker = tempDir.newFile("blocker") val sut = StreamTelemetryScopeImpl( name = "test", memoryCapacity = 2, diskCapacity = 1_000_000L, - spillDir = File(readOnlySpill, "nested"), + spillDir = File(blocker, "nested"), redactor = null, scope = scope, ) @@ -1068,9 +1057,6 @@ class StreamTelemetryScopeImplTest { val dir = tempDir.newFolder() val sut = createScope(memoryCapacity = 1, dir = dir) - // The literal string "\t" (backslash + t) collides with the delimiter escape sequence. - // After a spill roundtrip, decode converts "\\t" back to a real tab character. - // This documents the known limitation. sut.emit("tag\\twith\\tescape", "data") sut.emit("trigger", null) waitForSpill() From 1cbed5f4d61adc78d24926057c7ed0e2a5b2a289 Mon Sep 17 00:00:00 2001 From: Aleksandar Apostolov Date: Mon, 27 Apr 2026 16:30:32 +0200 Subject: [PATCH 9/9] fix(telemetry): inject IO dispatcher, handle file.delete() result MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace hardcoded Dispatchers.IO with injectable ioDispatcher param (defaults to Dispatchers.IO, testable with test dispatchers) - Handle file.delete() return value — fallback to writeText("") if delete fails --- .../internal/telemetry/StreamTelemetryScopeImpl.kt | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt index da6c651..3ca6df4 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt @@ -22,6 +22,7 @@ import io.getstream.android.core.api.telemetry.StreamTelemetryScope import io.getstream.android.core.api.utils.runCatchingCancellable import java.io.File import java.util.concurrent.atomic.AtomicBoolean +import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch @@ -62,6 +63,7 @@ internal class StreamTelemetryScopeImpl( private val spillDir: File, private val redactor: StreamSignalRedactor?, private val scope: CoroutineScope, + private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO, ) : StreamTelemetryScope { private val lock = Any() @@ -84,7 +86,7 @@ internal class StreamTelemetryScopeImpl( if (spilling.compareAndSet(false, true)) { val snapshot = buffer buffer = mutableListOf() - scope.launch(Dispatchers.IO) { spillToDisk(snapshot) } + scope.launch(ioDispatcher) { spillToDisk(snapshot) } } else { buffer.removeAt(0) } @@ -98,7 +100,7 @@ internal class StreamTelemetryScopeImpl( memorySnapshot = buffer buffer = mutableListOf() } - val diskSignals = withContext(Dispatchers.IO) { drainDisk() } + val diskSignals = withContext(ioDispatcher) { drainDisk() } if (diskSignals.isEmpty()) { memorySnapshot } else { @@ -148,7 +150,9 @@ internal class StreamTelemetryScopeImpl( return@withLock emptyList() } val signals = file.readLines().mapNotNull { decode(it) } - file.delete() + if (!file.delete()) { + file.writeText("") + } signals }