Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,6 @@ open class CombineEndpoint(
.reduce { acc, iEndpointInfo -> acc intersect iEndpointInfo }
}

/**
* Throws [UnsupportedOperationException] because [CombineEndpoint] does not have metrics.
*
* Call [IEndpoint.metrics] on each endpoint to get their metrics.
*/
override val metrics: Any
get() = throw UnsupportedOperationException("CombineEndpoint does not have metrics.")

private fun createNewStreamId(): Int {
var i = 0
while (endpointsToStreamIdsMap.keys.any { it.second == i }) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ class DummyEndpoint : IEndpointInternal {
TODO("Not yet implemented")
}

override val metrics: Any
get() = TODO("Not yet implemented")
override val throwableFlow: StateFlow<Throwable?> = MutableStateFlow(null).asStateFlow()

override suspend fun open(descriptor: MediaDescriptor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxer
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.ContentSink
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.FileSink
import io.github.thibaultbee.streampack.core.elements.utils.ConflatedJob
import io.github.thibaultbee.streampack.core.elements.interfaces.WithMetrics
import io.github.thibaultbee.streampack.core.logger.Logger
import io.github.thibaultbee.streampack.core.pipelines.IDispatcherProvider
import kotlinx.coroutines.CoroutineDispatcher
Expand All @@ -48,7 +49,7 @@ open class DynamicEndpoint(
private val context: Context,
private val defaultDispatcher: CoroutineDispatcher,
private val ioDispatcher: CoroutineDispatcher
) : IEndpointInternal {
) : IEndpointInternal, WithMetrics {
private val coroutineScope = CoroutineScope(defaultDispatcher)
private val mutex = Mutex()

Expand Down Expand Up @@ -85,7 +86,11 @@ open class DynamicEndpoint(
override fun getInfo(type: MediaDescriptor.Type) = getEndpoint(type).getInfo(type)

override val metrics: Any
get() = endpoint?.metrics ?: throw IllegalStateException("Endpoint is not opened")
get() {
val endpoint = endpoint ?: throw IllegalStateException("Endpoint is not opened")
return (endpoint as? WithMetrics)?.metrics
?: throw UnsupportedOperationException("Current endpoint does not support metrics")
}

init {
coroutineScope.launch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,4 @@ interface IEndpoint {
val supportedEncoders: List<String>
}
}

/**
* Metrics of the endpoint.
*/
val metrics: Any
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ class MediaMuxerEndpoint(

override fun getInfo(type: MediaDescriptor.Type) = Companion.getInfo(type)

override val metrics: Any
get() = TODO("Not yet implemented")

private val _isOpenFlow = MutableStateFlow(false)
override val isOpenFlow = _isOpenFlow.asStateFlow()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ class CompositeEndpoint(
override val info by lazy { EndpointInfo(muxer.info) }
override fun getInfo(type: MediaDescriptor.Type) = info

override val metrics: Any
get() = sink.metrics

init {
muxer.listener = object :
IMuxerInternal.IMuxerListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ import io.github.thibaultbee.streampack.core.logger.Logger
abstract class AbstractSink : ISinkInternal {
abstract val supportedSinkTypes: List<MediaSinkType>

override val metrics: Any
get() = TODO("Not yet implemented")

override suspend fun open(mediaDescriptor: MediaDescriptor) {
if (isOpenFlow.value) {
Logger.w(TAG, "Sink is already opened")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,4 @@ interface ISink {
* For example, if the file is opened for [FileSink].
*/
val isOpenFlow: StateFlow<Boolean>

/**
* Metrics of the sink.
*/
val metrics: Any
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (C) 2026 Thibault B.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.core.elements.interfaces

interface WithMetrics {
/**
* Metrics of the element.
*/
val metrics: Any
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ import java.io.Closeable
*/
class ChannelWithCloseableData<T>(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((T) -> Unit) = {}
) : ReceiveChannel<ChannelWithCloseableData.CloseableData<T>> {
private val channel =
Channel<CloseableData<T>>(capacity, onBufferOverflow, onUndeliveredElement = { it.close() })
Channel<CloseableData<T>>(capacity, onBufferOverflow, onUndeliveredElement = {
it.close()
onUndeliveredElement(it.data)
})

/**
* Sends data along with a close action to the channel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfi
*/
interface IBitrateRegulator {
/**
* Calls regularly to get new stats
* Calls regularly to get new metrics
*
* @param stats transmission stats
* @param metrics transmission metrics
* @param currentVideoBitrate current video bitrate target in bits/s.
* @param currentAudioBitrate current audio bitrate target in bits/s.
*/
fun update(stats: Any, currentVideoBitrate: Int, currentAudioBitrate: Int)
fun update(metrics: Any, currentVideoBitrate: Int, currentAudioBitrate: Int)

/**
* Factory interface you must use to create a [BitrateRegulator] object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package io.github.thibaultbee.streampack.core.regulator.controllers

import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig
import io.github.thibaultbee.streampack.core.elements.encoders.IEncoder
import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpoint
import io.github.thibaultbee.streampack.core.elements.interfaces.WithMetrics
import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IEncodingPipelineOutput
import io.github.thibaultbee.streampack.core.regulator.IBitrateRegulator
import kotlinx.coroutines.CoroutineDispatcher
Expand All @@ -27,14 +27,14 @@ import kotlinx.coroutines.CoroutineDispatcher
*
* @param audioEncoder the audio [IEncoder]
* @param videoEncoder the video [IEncoder]
* @param endpoint the [IEndpoint] implementation
* @param endpoint the [WithMetrics] implementation
* @param bitrateRegulatorFactory the [IBitrateRegulator.Factory] implementation. Use it to make your own bitrate regulator.
* @param bitrateRegulatorConfig bitrate regulator configuration
*/
abstract class BitrateRegulatorController(
private val audioEncoder: IEncoder?,
private val videoEncoder: IEncoder?,
private val endpoint: IEndpoint,
private val endpoint: WithMetrics,
private val bitrateRegulatorFactory: IBitrateRegulator.Factory,
private val bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig()
) : IBitrateRegulatorController {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package io.github.thibaultbee.streampack.core.regulator.controllers

import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig
import io.github.thibaultbee.streampack.core.elements.encoders.IEncoder
import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpoint
import io.github.thibaultbee.streampack.core.elements.interfaces.WithMetrics
import io.github.thibaultbee.streampack.core.elements.utils.CoroutineScheduler
import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IConfigurableAudioEncodingPipelineOutput
import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IConfigurableVideoEncodingPipelineOutput
Expand All @@ -30,23 +30,23 @@ import kotlinx.coroutines.CoroutineDispatcher
*
* @param audioEncoder the audio [IEncoder]
* @param videoEncoder the video [IEncoder]
* @param endpoint the [IEndpoint] implementation
* @param metricsProvider the [WithMetrics] implementation
* @param bitrateRegulatorFactory the [IBitrateRegulator.Factory] implementation. Use it to make your own bitrate regulator.
* @param bitrateRegulatorConfig bitrate regulator configuration
* @param pollingTimeInMs delay between each call to [IBitrateRegulator.update]
*/
open class SimpleBitrateRegulatorController(
open class IntervalBitrateRegulatorController(
audioEncoder: IEncoder?,
videoEncoder: IEncoder,
endpoint: IEndpoint,
metricsProvider: WithMetrics,
bitrateRegulatorFactory: IBitrateRegulator.Factory,
coroutineDispatcher: CoroutineDispatcher,
bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig(),
pollingTimeInMs: Long = DEFAULT_POLLING_TIME_IN_MS
) : BitrateRegulatorController(
audioEncoder,
videoEncoder,
endpoint,
metricsProvider,
bitrateRegulatorFactory,
bitrateRegulatorConfig
) {
Expand All @@ -66,7 +66,7 @@ open class SimpleBitrateRegulatorController(
*/
private val scheduler = CoroutineScheduler(pollingTimeInMs, coroutineDispatcher) {
bitrateRegulator.update(
endpoint.metrics,
metricsProvider.metrics,
videoEncoder.bitrate,
audioEncoder?.bitrate ?: 0
)
Expand Down Expand Up @@ -106,10 +106,11 @@ open class SimpleBitrateRegulatorController(
} else {
null
}
return SimpleBitrateRegulatorController(
val endpoint = pipelineOutput.endpoint as WithMetrics
return IntervalBitrateRegulatorController(
audioEncoder,
videoEncoder,
pipelineOutput.endpoint,
endpoint,
bitrateRegulatorFactory,
coroutineDispatcher,
bitrateRegulatorConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,27 +164,79 @@ class DataStoreRepository(
EndpointType.RTMP -> {
val url =
preferences[stringPreferencesKey(context.getString(R.string.rtmp_server_url_key))]
?: context.getString(R.string.default_rtmp_url)
?: context.getString(R.string.rtmp_default_url)
UriMediaDescriptor(context, url)
}
}
}.distinctUntilChanged()

val bitrateRegulatorConfigFlow: Flow<BitrateRegulatorConfig?> =
dataStore.data.map { preferences ->
val endpointTypeId =
preferences[stringPreferencesKey(context.getString(R.string.endpoint_type_key))]?.toInt()
?: EndpointType.SRT.id

val isBitrateRegulatorEnable =
preferences[booleanPreferencesKey(context.getString(R.string.srt_server_enable_bitrate_regulation_key))]
preferences[booleanPreferencesKey(
context.getString(
when (endpointTypeId) {
EndpointType.SRT.id -> {
R.string.srt_server_enable_bitrate_regulation_key
}

EndpointType.RTMP.id -> {
R.string.rtmp_server_enable_bitrate_regulation_key
}

else -> {
throw IllegalArgumentException("Unknown endpoint type")
}
}
)
)]
?: true
if (!isBitrateRegulatorEnable) {
return@map null
}

val videoMinBitrate =
preferences[intPreferencesKey(context.getString(R.string.srt_server_video_min_bitrate_key))]?.toInt()
preferences[intPreferencesKey(
context.getString(
when (endpointTypeId) {
EndpointType.SRT.id -> {
R.string.srt_server_video_min_bitrate_key
}

EndpointType.RTMP.id -> {
R.string.rtmp_server_video_min_bitrate_key
}

else -> {
throw IllegalArgumentException("Unknown endpoint type")
}
}
)
)]
?.times(1000)
?: 300000
val videoMaxBitrate =
preferences[intPreferencesKey(context.getString(R.string.srt_server_video_target_bitrate_key))]?.toInt()
preferences[intPreferencesKey(
context.getString(
when (endpointTypeId) {
EndpointType.SRT.id -> {
R.string.srt_server_video_target_bitrate_key
}

EndpointType.RTMP.id -> {
R.string.rtmp_server_video_target_bitrate_key
}

else -> {
throw IllegalArgumentException("Unknown endpoint type")
}
}
)
)]
?.times(1000)
?: 10000000
BitrateRegulatorConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ import io.github.thibaultbee.streampack.core.streamers.single.VideoOnlySingleStr
import io.github.thibaultbee.streampack.core.streamers.single.withAudio
import io.github.thibaultbee.streampack.core.streamers.single.withVideo
import io.github.thibaultbee.streampack.core.utils.extensions.isClosedException
import io.github.thibaultbee.streampack.ext.srt.regulator.controllers.simpleSrtBitrateRegulatorControllerFactory
import io.github.thibaultbee.streampack.ext.rtmp.regulator.controllers.intervalRtmpBitrateRegulatorControllerFactory
import io.github.thibaultbee.streampack.ext.srt.regulator.controllers.intervalSrtBitrateRegulatorControllerFactory
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
Expand Down Expand Up @@ -312,16 +313,32 @@ class PreviewViewModel(private val application: Application) : ObservableViewMod
val descriptor = storageRepository.endpointDescriptorFlow.first()
streamer.startStream(descriptor)

if (descriptor.type.sinkType == MediaSinkType.SRT) {
if ((descriptor.type.sinkType == MediaSinkType.RTMP) || (descriptor.type.sinkType == MediaSinkType.SRT)) {
val bitrateRegulatorConfig =
storageRepository.bitrateRegulatorConfigFlow.first()
if (bitrateRegulatorConfig != null) {
Log.i(TAG, "Add bitrate regulator controller")
streamer.addBitrateRegulatorController(
simpleSrtBitrateRegulatorControllerFactory(
bitrateRegulatorConfig = bitrateRegulatorConfig
)
)
val controllerFactory =
when (descriptor.type.sinkType) {
MediaSinkType.RTMP -> {
intervalRtmpBitrateRegulatorControllerFactory(
bitrateRegulatorConfig = bitrateRegulatorConfig
)
}

MediaSinkType.SRT -> {
intervalSrtBitrateRegulatorControllerFactory(
bitrateRegulatorConfig = bitrateRegulatorConfig
)
}

else -> {
null
}
}
controllerFactory?.let {
streamer.addBitrateRegulatorController(it)
} ?: Log.e(TAG, "Controller factory is null")
}
}
} catch (e: CancellationException) {
Expand Down
Loading
Loading