From 13727809e00c3922469e743fb4cd892e991cf03c Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Mon, 13 Apr 2026 11:53:42 +0200 Subject: [PATCH 1/5] feat(*): add metrics and regulator for RTMP --- .../utils/ChannelWithCloseableData.kt | 8 +- .../core/regulator/IBitrateRegulator.kt | 6 +- .../app/data/storage/DataStoreRepository.kt | 60 +++++++++- .../app/ui/main/PreviewViewModel.kt | 30 ++++- .../app/ui/settings/SettingsFragment.kt | 70 +++++++++--- demos/camera/src/main/res/values/strings.xml | 9 +- .../src/main/res/xml/root_preferences.xml | 24 +++- extensions/rtmp/build.gradle.kts | 2 +- .../rtmp/elements/endpoints/RtmpEndpoint.kt | 65 ++++++++++- .../rtmp/regulator/RtmpBitrateRegulator.kt | 74 ++++++++++++ .../regulator/SimpleRtmpBitrateRegulator.kt | 106 ++++++++++++++++++ ...RtmpBitrateRegulatorControllerFactories.kt | 40 +++++++ .../srt/regulator/DummySrtBitrateRegulator.kt | 14 +-- .../ext/srt/regulator/SrtBitrateRegulator.kt | 10 +- ...SrtBitrateRegulatorControllerFactories.kt} | 0 gradle/libs.versions.toml | 2 +- settings.gradle.kts | 1 + 17 files changed, 467 insertions(+), 54 deletions(-) create mode 100644 extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt create mode 100644 extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/SimpleRtmpBitrateRegulator.kt create mode 100644 extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/controllers/RtmpBitrateRegulatorControllerFactories.kt rename extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/{DefaultSrtBitrateRegulatorController.kt => SrtBitrateRegulatorControllerFactories.kt} (100%) diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/ChannelWithCloseableData.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/ChannelWithCloseableData.kt index 130cf4450..35a803271 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/ChannelWithCloseableData.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/ChannelWithCloseableData.kt @@ -34,10 +34,14 @@ import java.io.Closeable */ class ChannelWithCloseableData( capacity: Int = RENDEZVOUS, - onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND + onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, + onUndeliveredElement: ((T) -> Unit) = {} ) : ReceiveChannel> { private val channel = - Channel>(capacity, onBufferOverflow, onUndeliveredElement = { it.close() }) + Channel>(capacity, onBufferOverflow, onUndeliveredElement = { + it.close() + onUndeliveredElement(it.data) + }) /** * Sends data along with a close action to the channel. diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/IBitrateRegulator.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/IBitrateRegulator.kt index 367af6973..4cd1a5a11 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/IBitrateRegulator.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/IBitrateRegulator.kt @@ -22,13 +22,13 @@ import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfi */ interface IBitrateRegulator { /** - * Calls regularly to get new stats + * Calls regularly to get new metrics * - * @param stats transmission stats + * @param metrics transmission metrics * @param currentVideoBitrate current video bitrate target in bits/s. * @param currentAudioBitrate current audio bitrate target in bits/s. */ - fun update(stats: Any, currentVideoBitrate: Int, currentAudioBitrate: Int) + fun update(metrics: Any, currentVideoBitrate: Int, currentAudioBitrate: Int) /** * Factory interface you must use to create a [BitrateRegulator] object. diff --git a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/data/storage/DataStoreRepository.kt b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/data/storage/DataStoreRepository.kt index ddab33b3d..a170631d3 100644 --- a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/data/storage/DataStoreRepository.kt +++ b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/data/storage/DataStoreRepository.kt @@ -164,7 +164,7 @@ class DataStoreRepository( EndpointType.RTMP -> { val url = preferences[stringPreferencesKey(context.getString(R.string.rtmp_server_url_key))] - ?: context.getString(R.string.default_rtmp_url) + ?: context.getString(R.string.rtmp_default_url) UriMediaDescriptor(context, url) } } @@ -172,19 +172,71 @@ class DataStoreRepository( val bitrateRegulatorConfigFlow: Flow = dataStore.data.map { preferences -> + val endpointTypeId = + preferences[stringPreferencesKey(context.getString(R.string.endpoint_type_key))]?.toInt() + ?: EndpointType.SRT.id + val isBitrateRegulatorEnable = - preferences[booleanPreferencesKey(context.getString(R.string.srt_server_enable_bitrate_regulation_key))] + preferences[booleanPreferencesKey( + context.getString( + when (endpointTypeId) { + EndpointType.SRT.id -> { + R.string.srt_server_enable_bitrate_regulation_key + } + + EndpointType.RTMP.id -> { + R.string.rtmp_server_enable_bitrate_regulation_key + } + + else -> { + throw IllegalArgumentException("Unknown endpoint type") + } + } + ) + )] ?: true if (!isBitrateRegulatorEnable) { return@map null } val videoMinBitrate = - preferences[intPreferencesKey(context.getString(R.string.srt_server_video_min_bitrate_key))]?.toInt() + preferences[intPreferencesKey( + context.getString( + when (endpointTypeId) { + EndpointType.SRT.id -> { + R.string.srt_server_video_min_bitrate_key + } + + EndpointType.RTMP.id -> { + R.string.rtmp_server_video_min_bitrate_key + } + + else -> { + throw IllegalArgumentException("Unknown endpoint type") + } + } + ) + )] ?.times(1000) ?: 300000 val videoMaxBitrate = - preferences[intPreferencesKey(context.getString(R.string.srt_server_video_target_bitrate_key))]?.toInt() + preferences[intPreferencesKey( + context.getString( + when (endpointTypeId) { + EndpointType.SRT.id -> { + R.string.srt_server_video_target_bitrate_key + } + + EndpointType.RTMP.id -> { + R.string.rtmp_server_video_target_bitrate_key + } + + else -> { + throw IllegalArgumentException("Unknown endpoint type") + } + } + ) + )] ?.times(1000) ?: 10000000 BitrateRegulatorConfig( diff --git a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/PreviewViewModel.kt b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/PreviewViewModel.kt index 0b3dbd592..35d331522 100644 --- a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/PreviewViewModel.kt +++ b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/PreviewViewModel.kt @@ -65,6 +65,7 @@ import io.github.thibaultbee.streampack.core.streamers.single.VideoOnlySingleStr import io.github.thibaultbee.streampack.core.streamers.single.withAudio import io.github.thibaultbee.streampack.core.streamers.single.withVideo import io.github.thibaultbee.streampack.core.utils.extensions.isClosedException +import io.github.thibaultbee.streampack.ext.rtmp.regulator.controllers.simpleRtmpBitrateRegulatorControllerFactory import io.github.thibaultbee.streampack.ext.srt.regulator.controllers.simpleSrtBitrateRegulatorControllerFactory import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Dispatchers @@ -77,6 +78,7 @@ import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterNotNull import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.map +import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex @@ -312,16 +314,32 @@ class PreviewViewModel(private val application: Application) : ObservableViewMod val descriptor = storageRepository.endpointDescriptorFlow.first() streamer.startStream(descriptor) - if (descriptor.type.sinkType == MediaSinkType.SRT) { + if ((descriptor.type.sinkType == MediaSinkType.RTMP) || (descriptor.type.sinkType == MediaSinkType.SRT)) { val bitrateRegulatorConfig = storageRepository.bitrateRegulatorConfigFlow.first() if (bitrateRegulatorConfig != null) { Log.i(TAG, "Add bitrate regulator controller") - streamer.addBitrateRegulatorController( - simpleSrtBitrateRegulatorControllerFactory( - bitrateRegulatorConfig = bitrateRegulatorConfig - ) - ) + val controllerFactory = + when (descriptor.type.sinkType) { + MediaSinkType.RTMP -> { + simpleRtmpBitrateRegulatorControllerFactory( + bitrateRegulatorConfig = bitrateRegulatorConfig + ) + } + + MediaSinkType.SRT -> { + simpleSrtBitrateRegulatorControllerFactory( + bitrateRegulatorConfig = bitrateRegulatorConfig + ) + } + + else -> { + null + } + } + controllerFactory?.let { + streamer.addBitrateRegulatorController(it) + } ?: Log.e(TAG, "Controller factory is null") } } } catch (e: CancellationException) { diff --git a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/settings/SettingsFragment.kt b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/settings/SettingsFragment.kt index 2a8079eda..04a3b1b1b 100644 --- a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/settings/SettingsFragment.kt +++ b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/settings/SettingsFragment.kt @@ -131,18 +131,30 @@ class SettingsFragment : PreferenceFragmentCompat() { this.findPreference(getString(R.string.srt_server_port_key))!! } - private val serverEnableBitrateRegulationPreference: SwitchPreference by lazy { + private val srtServerEnableBitrateRegulationPreference: SwitchPreference by lazy { this.findPreference(getString(R.string.srt_server_enable_bitrate_regulation_key))!! } - private val serverTargetVideoBitratePreference: SeekBarPreference by lazy { + private val srtServerTargetVideoBitratePreference: SeekBarPreference by lazy { this.findPreference(getString(R.string.srt_server_video_target_bitrate_key))!! } - private val serverMinVideoBitratePreference: SeekBarPreference by lazy { + private val srtServerMinVideoBitratePreference: SeekBarPreference by lazy { this.findPreference(getString(R.string.srt_server_video_min_bitrate_key))!! } + private val rtmpServerEnableBitrateRegulationPreference: SwitchPreference by lazy { + this.findPreference(getString(R.string.rtmp_server_enable_bitrate_regulation_key))!! + } + + private val rtmpServerTargetVideoBitratePreference: SeekBarPreference by lazy { + this.findPreference(getString(R.string.rtmp_server_video_target_bitrate_key))!! + } + + private val rtmpServerMinVideoBitratePreference: SeekBarPreference by lazy { + this.findPreference(getString(R.string.rtmp_server_video_min_bitrate_key))!! + } + private val fileNamePreference: EditTextPreference by lazy { this.findPreference(getString(R.string.file_name_key))!! } @@ -417,26 +429,50 @@ class SettingsFragment : PreferenceFragmentCompat() { editText.filters = arrayOf(InputFilter.LengthFilter(5)) } - serverTargetVideoBitratePreference.isVisible = - serverEnableBitrateRegulationPreference.isChecked - serverMinVideoBitratePreference.isVisible = - serverEnableBitrateRegulationPreference.isChecked - serverEnableBitrateRegulationPreference.setOnPreferenceChangeListener { _, newValue -> - serverTargetVideoBitratePreference.isVisible = newValue as Boolean - serverMinVideoBitratePreference.isVisible = newValue + srtServerTargetVideoBitratePreference.isVisible = + srtServerEnableBitrateRegulationPreference.isChecked + srtServerMinVideoBitratePreference.isVisible = + srtServerEnableBitrateRegulationPreference.isChecked + srtServerEnableBitrateRegulationPreference.setOnPreferenceChangeListener { _, newValue -> + srtServerTargetVideoBitratePreference.isVisible = newValue as Boolean + srtServerMinVideoBitratePreference.isVisible = newValue + true + } + + srtServerTargetVideoBitratePreference.setOnPreferenceChangeListener { _, newValue -> + if ((newValue as Int) < srtServerMinVideoBitratePreference.value) { + srtServerMinVideoBitratePreference.value = newValue + } + true + } + + srtServerMinVideoBitratePreference.setOnPreferenceChangeListener { _, newValue -> + if ((newValue as Int) > srtServerTargetVideoBitratePreference.value) { + srtServerTargetVideoBitratePreference.value = newValue + } + true + } + + rtmpServerTargetVideoBitratePreference.isVisible = + rtmpServerEnableBitrateRegulationPreference.isChecked + rtmpServerMinVideoBitratePreference.isVisible = + rtmpServerEnableBitrateRegulationPreference.isChecked + rtmpServerEnableBitrateRegulationPreference.setOnPreferenceChangeListener { _, newValue -> + rtmpServerTargetVideoBitratePreference.isVisible = newValue as Boolean + rtmpServerMinVideoBitratePreference.isVisible = newValue true } - serverTargetVideoBitratePreference.setOnPreferenceChangeListener { _, newValue -> - if ((newValue as Int) < serverMinVideoBitratePreference.value) { - serverMinVideoBitratePreference.value = newValue + rtmpServerTargetVideoBitratePreference.setOnPreferenceChangeListener { _, newValue -> + if ((newValue as Int) < rtmpServerMinVideoBitratePreference.value) { + rtmpServerMinVideoBitratePreference.value = newValue } true } - serverMinVideoBitratePreference.setOnPreferenceChangeListener { _, newValue -> - if ((newValue as Int) > serverTargetVideoBitratePreference.value) { - serverTargetVideoBitratePreference.value = newValue + rtmpServerMinVideoBitratePreference.setOnPreferenceChangeListener { _, newValue -> + if ((newValue as Int) > rtmpServerTargetVideoBitratePreference.value) { + rtmpServerTargetVideoBitratePreference.value = newValue } true } @@ -459,7 +495,7 @@ class SettingsFragment : PreferenceFragmentCompat() { // Update file extension if (endpoint.hasFileCapabilities) { // Remove previous extension - FileExtension.entries.forEach { + FileExtension.entries.forEach { _ -> fileNamePreference.text = fileNamePreference.text?.substringBeforeLast(".") } // Add correct extension diff --git a/demos/camera/src/main/res/values/strings.xml b/demos/camera/src/main/res/values/strings.xml index 1a8393963..1fa81cee8 100644 --- a/demos/camera/src/main/res/values/strings.xml +++ b/demos/camera/src/main/res/values/strings.xml @@ -127,7 +127,14 @@ rtmp_server_key RTMP Server rtmp_server_url_key - rtmp://192.168.1.192/s/streamKey + rtmp://192.168.1.192/s/streamKey + rtmp_server_enable_bitrate_regulation_key + Enable bitrate regulation + rtmp_server_video_target_bitrate_key + Video target bitrate (kb/s) + rtmp_server_video_min_bitrate_key + Video minimum bitrate (kb/s) + URL file_endpoint_key diff --git a/demos/camera/src/main/res/xml/root_preferences.xml b/demos/camera/src/main/res/xml/root_preferences.xml index e47de9ad3..58a3a89b7 100644 --- a/demos/camera/src/main/res/xml/root_preferences.xml +++ b/demos/camera/src/main/res/xml/root_preferences.xml @@ -167,11 +167,33 @@ app:title="@string/rtmp_server"> + + + + + { private val coroutineScope = CoroutineScope(SupervisorJob() + defaultDispatcher) private val mutex = Mutex() + private val metricsLock = Any() + private var frameDropped = 0L + private var audioFrameDropped = 0L + private var videoFrameDropped = 0L + + private var payloadSendDroppedSize = 0L + private var audioPayloadSendDroppedSize = 0L + private var videoPayloadSendDroppedSize = 0L + private val flvTagChannel = ChannelWithCloseableData( - 10 /* Arbitrary buffer size. TODO: add a parameter to set it */, BufferOverflow.DROP_OLDEST + 10 /* Arbitrary buffer size. TODO: add a parameter to set it */, BufferOverflow.DROP_OLDEST, + onUndeliveredElement = { flvTag -> + synchronized(metricsLock) { + val payloadSize = flvTag.data.getSize(AmfVersion.AMF0) + frameDropped++ + payloadSendDroppedSize += payloadSize + if (flvTag.data is AudioData) { + audioFrameDropped++ + audioPayloadSendDroppedSize += payloadSize + } else if (flvTag.data is VideoData) { + videoFrameDropped++ + videoPayloadSendDroppedSize += payloadSize + } + } + } ) private val flvTagBuilder = FlvTagBuilder(flvTagChannel) @@ -70,12 +100,24 @@ class RtmpEndpoint internal constructor( private val connectionBuilder = RtmpConnectionBuilder(selectorManager) private var rtmpClient: RtmpClient? = null - private var startUpTimestamp = INVALID_TIMESTAMP private val timestampMutex = Mutex() - override val metrics: Any - get() = TODO("Not yet implemented") + override val metrics: RtmpMetrics + get() { + val metrics = + rtmpClient?.metrics ?: return RtmpMetrics.ZERO + return synchronized(metricsLock) { + metrics.copy( + messagesSendDropped = metrics.messagesSendDropped + frameDropped, + audioMessagesSendDropped = metrics.audioMessagesSendDropped + audioFrameDropped, + videoMessagesSendDropped = metrics.videoMessagesSendDropped + videoFrameDropped, + payloadSendDroppedSize = metrics.payloadSendDroppedSize + payloadSendDroppedSize, + audioPayloadSendDroppedSize = metrics.audioPayloadSendDroppedSize + audioPayloadSendDroppedSize, + videoPayloadSendDroppedSize = metrics.videoPayloadSendDroppedSize + videoPayloadSendDroppedSize + ) + } + } private val _isOpenFlow = MutableStateFlow(false) override val isOpenFlow = _isOpenFlow.asStateFlow() @@ -95,6 +137,18 @@ class RtmpEndpoint internal constructor( } } + private fun resetMetrics() { + synchronized(metricsLock) { + frameDropped = 0L + audioFrameDropped = 0L + videoFrameDropped = 0L + + payloadSendDroppedSize = 0L + audioPayloadSendDroppedSize = 0L + videoPayloadSendDroppedSize = 0L + } + } + private suspend fun safeClient(block: suspend (RtmpClient) -> T): T { val rtmpClient = requireNotNull(rtmpClient) { "Not opened" } require(!rtmpClient.isClosed) { "Connection closed" } @@ -109,6 +163,7 @@ class RtmpEndpoint internal constructor( return@withContext } + resetMetrics() rtmpClient = connectionBuilder.connect(descriptor.uri.toString()).apply { _isOpenFlow.emit(true) diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt new file mode 100644 index 000000000..89d4728d8 --- /dev/null +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2026 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.ext.rtmp.regulator + +import io.github.komedia.komuxer.rtmp.util.metrics.RtmpMetrics +import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig +import io.github.thibaultbee.streampack.core.regulator.BitrateRegulator +import io.github.thibaultbee.streampack.core.regulator.IBitrateRegulator + +/** + * Base class of RTMP bitrate regulation implementation. + * + * If you want to implement your custom bitrate regulator, it must inherit from this class. + * The bitrate regulator object is created by streamers with the [IBitrateRegulator.Factory]. + * + * @param bitrateRegulatorConfig bitrate regulation configuration + * @param onVideoTargetBitrateChange call when you have to change video bitrate + * @param onAudioTargetBitrateChange call when you have to change audio bitrate + */ +abstract class RtmpBitrateRegulator( + bitrateRegulatorConfig: BitrateRegulatorConfig, + onVideoTargetBitrateChange: ((Int) -> Unit), + onAudioTargetBitrateChange: ((Int) -> Unit) +) : BitrateRegulator( + bitrateRegulatorConfig, + onVideoTargetBitrateChange, + onAudioTargetBitrateChange +) { + override fun update(metrics: Any, currentVideoBitrate: Int, currentAudioBitrate: Int) = + update(metrics as RtmpMetrics, currentVideoBitrate, currentAudioBitrate) + + /** + * Call regularly to get new RTMP metrics + * + * @param metrics RTMP transmission metrics + * @param currentVideoBitrate current video bitrate target in bits/s. + * @param currentAudioBitrate current audio bitrate target in bits/s. + */ + abstract fun update(metrics: RtmpMetrics, currentVideoBitrate: Int, currentAudioBitrate: Int) + + /** + * Factory interface you must use to create a [RtmpBitrateRegulator] object. + * If you want to create a custom RTMP bitrate regulation implementation, create a factory that + * implements this interface. + */ + interface Factory : IBitrateRegulator.Factory { + /** + * Creates a [RtmpBitrateRegulator] object from given parameters + * + * @param bitrateRegulatorConfig bitrate regulation configuration + * @param onVideoTargetBitrateChange call when you have to change video bitrate + * @param onAudioTargetBitrateChange call when you have to change audio bitrate + * @return a [RtmpBitrateRegulator] object + */ + override fun newBitrateRegulator( + bitrateRegulatorConfig: BitrateRegulatorConfig, + onVideoTargetBitrateChange: ((Int) -> Unit), + onAudioTargetBitrateChange: ((Int) -> Unit) + ): RtmpBitrateRegulator + } +} \ No newline at end of file diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/SimpleRtmpBitrateRegulator.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/SimpleRtmpBitrateRegulator.kt new file mode 100644 index 000000000..de1b248d5 --- /dev/null +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/SimpleRtmpBitrateRegulator.kt @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2026 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.ext.rtmp.regulator + +import io.github.komedia.komuxer.rtmp.util.metrics.RtmpMetrics +import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig +import io.github.thibaultbee.streampack.core.regulator.BitrateRegulator +import kotlin.math.max +import kotlin.math.min + +/** + * A [BitrateRegulator] that reduce video bitrate when packets are lost. + * + * @param bitrateRegulatorConfig bitrate regulation configuration + * @param onVideoTargetBitrateChange call when you have to change video bitrate + * @param onAudioTargetBitrateChange call when you have to change audio bitrate + */ +class SimpleRtmpBitrateRegulator( + bitrateRegulatorConfig: BitrateRegulatorConfig, + onVideoTargetBitrateChange: ((Int) -> Unit), + onAudioTargetBitrateChange: ((Int) -> Unit) +) : RtmpBitrateRegulator( + bitrateRegulatorConfig, + onVideoTargetBitrateChange, + onAudioTargetBitrateChange +) { + companion object { + const val MINIMUM_DECREASE_THRESHOLD = 100000 // b/s + const val MAXIMUM_INCREASE_THRESHOLD = 200000 // b/s + } + + /** + * Call regularly to get new RTMP metrics + * + * @param metrics RTMP transmission metrics + * @param currentVideoBitrate current video bitrate target in bits/s. + * @param currentAudioBitrate current audio bitrate target in bits/s. + */ + override fun update(metrics: RtmpMetrics, currentVideoBitrate: Int, currentAudioBitrate: Int) { + if (metrics.messagesSendDropped > 0) { + // Detected packet loss - quickly react + val newVideoBitrate = currentVideoBitrate - max( + currentVideoBitrate * 20 / 100, // too late - drop bitrate by 20 % + MINIMUM_DECREASE_THRESHOLD // getting down by 100000 b/s minimum + ) + onVideoTargetBitrateChange( + max( + newVideoBitrate, + bitrateRegulatorConfig.videoBitrateRange.lower + ) + ) + } else if (currentVideoBitrate < bitrateRegulatorConfig.videoBitrateRange.upper) { + // Try to increase to the max target + val newVideoBitrate = currentVideoBitrate + min( + (bitrateRegulatorConfig.videoBitrateRange.upper - currentVideoBitrate) * 50 / 100, // getting slower when reaching target bitrate + MAXIMUM_INCREASE_THRESHOLD // not increasing to fast + ) + onVideoTargetBitrateChange( + min( + newVideoBitrate, + bitrateRegulatorConfig.videoBitrateRange.upper + ) + ) + } + } + + /** + * Factory interface you must use to create a [SimpleRtmpBitrateRegulator] object. + * If you want to create a custom RTMP bitrate regulation implementation, create a factory that + * implements this interface. + */ + class Factory : RtmpBitrateRegulator.Factory { + /** + * Creates a [SimpleRtmpBitrateRegulator] object from given parameters + * + * @param bitrateRegulatorConfig bitrate regulation configuration + * @param onVideoTargetBitrateChange call when you have to change video bitrate + * @param onAudioTargetBitrateChange call when you have to change audio bitrate + * @return a [SimpleRtmpBitrateRegulator] object + */ + override fun newBitrateRegulator( + bitrateRegulatorConfig: BitrateRegulatorConfig, + onVideoTargetBitrateChange: ((Int) -> Unit), + onAudioTargetBitrateChange: ((Int) -> Unit) + ): SimpleRtmpBitrateRegulator { + return SimpleRtmpBitrateRegulator( + bitrateRegulatorConfig, + onVideoTargetBitrateChange, + onAudioTargetBitrateChange + ) + } + } +} \ No newline at end of file diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/controllers/RtmpBitrateRegulatorControllerFactories.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/controllers/RtmpBitrateRegulatorControllerFactories.kt new file mode 100644 index 000000000..eb102122a --- /dev/null +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/controllers/RtmpBitrateRegulatorControllerFactories.kt @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2026 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.ext.rtmp.regulator.controllers + +import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig +import io.github.thibaultbee.streampack.core.regulator.controllers.SimpleBitrateRegulatorController +import io.github.thibaultbee.streampack.core.regulator.controllers.SimpleBitrateRegulatorController.Companion.DEFAULT_POLLING_TIME_IN_MS +import io.github.thibaultbee.streampack.ext.rtmp.regulator.RtmpBitrateRegulator +import io.github.thibaultbee.streampack.ext.rtmp.regulator.SimpleRtmpBitrateRegulator + +/** + * A [SimpleBitrateRegulatorController.Factory] for [RtmpBitrateRegulator]. + * + * @param bitrateRegulatorFactory the [RtmpBitrateRegulator.Factory] implementation. Use it to make your own bitrate regulator. + * @param bitrateRegulatorConfig bitrate regulator configuration + * @param pollingTimeInMs delay between each call to [RtmpBitrateRegulator.update] + * + * @see SimpleBitrateRegulatorController.Factory + * @see SimpleRtmpBitrateRegulator.Factory + */ +fun simpleRtmpBitrateRegulatorControllerFactory( + bitrateRegulatorFactory: RtmpBitrateRegulator.Factory = SimpleRtmpBitrateRegulator.Factory(), + bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig(), + pollingTimeInMs: Long = DEFAULT_POLLING_TIME_IN_MS +) = SimpleBitrateRegulatorController.Factory( + bitrateRegulatorFactory, bitrateRegulatorConfig, pollingTimeInMs +) diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/DummySrtBitrateRegulator.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/DummySrtBitrateRegulator.kt index 65dfd6916..65101a21b 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/DummySrtBitrateRegulator.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/DummySrtBitrateRegulator.kt @@ -42,12 +42,12 @@ class DummySrtBitrateRegulator( const val SEND_PACKET_THRESHOLD = 50 } - override fun update(stats: Stats, currentVideoBitrate: Int, currentAudioBitrate: Int) { - val estimatedBandwidth = (stats.mbpsBandwidth * 1000000).toInt() + override fun update(metrics: Stats, currentVideoBitrate: Int, currentAudioBitrate: Int) { + val estimatedBandwidth = (metrics.mbpsBandwidth * 1000000).toInt() if (currentVideoBitrate > bitrateRegulatorConfig.videoBitrateRange.lower) { val newVideoBitrate = when { - stats.pktSndLoss > 0 -> { + metrics.pktSndLoss > 0 -> { // Detected packet loss - quickly react currentVideoBitrate - max( currentVideoBitrate * 20 / 100, // too late - drop bitrate by 20 % @@ -55,7 +55,7 @@ class DummySrtBitrateRegulator( ) } - stats.pktSndBuf > SEND_PACKET_THRESHOLD -> { + metrics.pktSndBuf > SEND_PACKET_THRESHOLD -> { // Try to avoid congestion currentVideoBitrate - max( currentVideoBitrate * 10 / 100, // drop bitrate by 10 % @@ -80,10 +80,8 @@ class DummySrtBitrateRegulator( ) // Don't go under videoBitrateRange.lower return } - } - - // Can bitrate go upper? - if (currentVideoBitrate < bitrateRegulatorConfig.videoBitrateRange.upper) { + // Can bitrate go upper? + } else if (currentVideoBitrate < bitrateRegulatorConfig.videoBitrateRange.upper) { val newVideoBitrate = when { (currentVideoBitrate + currentAudioBitrate) < estimatedBandwidth -> { currentVideoBitrate + min( diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/SrtBitrateRegulator.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/SrtBitrateRegulator.kt index dbfb53a84..6e5b4443c 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/SrtBitrateRegulator.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/SrtBitrateRegulator.kt @@ -39,17 +39,17 @@ abstract class SrtBitrateRegulator( onVideoTargetBitrateChange, onAudioTargetBitrateChange ) { - override fun update(stats: Any, currentVideoBitrate: Int, currentAudioBitrate: Int) = - update(stats as Stats, currentVideoBitrate, currentAudioBitrate) + override fun update(metrics: Any, currentVideoBitrate: Int, currentAudioBitrate: Int) = + update(metrics as Stats, currentVideoBitrate, currentAudioBitrate) /** - * Call regularly to get new SRT stats + * Call regularly to get new SRT metrics * - * @param stats SRT transmission stats + * @param metrics SRT transmission metrics * @param currentVideoBitrate current video bitrate target in bits/s. * @param currentAudioBitrate current audio bitrate target in bits/s. */ - abstract fun update(stats: Stats, currentVideoBitrate: Int, currentAudioBitrate: Int) + abstract fun update(metrics: Stats, currentVideoBitrate: Int, currentAudioBitrate: Int) /** * Factory interface you must use to create a [SrtBitrateRegulator] object. diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/DefaultSrtBitrateRegulatorController.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/SrtBitrateRegulatorControllerFactories.kt similarity index 100% rename from extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/DefaultSrtBitrateRegulatorController.kt rename to extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/SrtBitrateRegulatorControllerFactories.kt diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 2cb2b2401..feda42b74 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -27,7 +27,7 @@ kotlinxIo = "0.8.0" material = "1.13.0" mockk = "1.14.5" robolectric = "4.16" -komuxer = "0.3.4" +komuxer = "0.4.0" srtdroid = "1.9.5" junitKtx = "1.3.0" compose = "1.10.1" diff --git a/settings.gradle.kts b/settings.gradle.kts index de694da52..16ddf629a 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -15,6 +15,7 @@ dependencyResolutionManagement { repositoriesMode.set(RepositoriesMode.FAIL_ON_PROJECT_REPOS) repositories { google() + mavenLocal() mavenCentral() } } From ce139838c918d356dc8552d1ef8298200eaa5347 Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Wed, 13 May 2026 09:40:23 +0200 Subject: [PATCH 2/5] chore(srt): add missing header license --- .../srt/elements/endpoints/SrtEndpointFactory.kt | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/SrtEndpointFactory.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/SrtEndpointFactory.kt index 920181103..3c05230d9 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/SrtEndpointFactory.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/SrtEndpointFactory.kt @@ -1,3 +1,18 @@ +/* + * Copyright (C) 2026 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.github.thibaultbee.streampack.ext.srt.elements.endpoints import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.createDefaultTsServiceInfo From 1a2f0091c45bb1acd9c5c31e0f757f4d856f696b Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Wed, 13 May 2026 09:43:27 +0200 Subject: [PATCH 3/5] feat(*): add a dedicated `WithMetrics` interface. --- .../elements/endpoints/CombineEndpoint.kt | 8 -- .../core/elements/endpoints/DummyEndpoint.kt | 2 - .../elements/endpoints/DynamicEndpoint.kt | 9 +- .../core/elements/endpoints/IEndpoint.kt | 5 - .../elements/endpoints/MediaMuxerEndpoint.kt | 3 - .../endpoints/composites/CompositeEndpoint.kt | 3 - .../composites/sinks/AbstractSink.kt | 3 - .../endpoints/composites/sinks/ISink.kt | 5 - .../core/elements/interfaces/WithMetrics.kt | 23 ++++ .../core/regulator/BitrateRegulator.kt | 4 +- .../core/regulator/IBitrateRegulator.kt | 8 +- .../controllers/BitrateRegulatorController.kt | 8 +- ... => IntervalBitrateRegulatorController.kt} | 24 ++-- .../app/ui/main/PreviewViewModel.kt | 9 +- .../flv/elements/endpoints/FlvEndpoints.kt | 3 - .../rtmp/regulator/RtmpBitrateRegulator.kt | 13 ++- ...RtmpBitrateRegulatorControllerFactories.kt | 12 +- .../endpoints/composites/sinks/SrtSink.kt | 8 +- .../ext/srt/regulator/SrtBitrateRegulator.kt | 9 +- .../SrtBitrateRegulatorControllerFactories.kt | 12 +- .../ext/srt/utils/SrtStatsHelper.kt | 107 ++++++++++++++++++ 21 files changed, 191 insertions(+), 87 deletions(-) create mode 100644 core/src/main/java/io/github/thibaultbee/streampack/core/elements/interfaces/WithMetrics.kt rename core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/{SimpleBitrateRegulatorController.kt => IntervalBitrateRegulatorController.kt} (87%) create mode 100644 extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/utils/SrtStatsHelper.kt diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/CombineEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/CombineEndpoint.kt index f9d235323..190e95b7e 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/CombineEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/CombineEndpoint.kt @@ -115,14 +115,6 @@ open class CombineEndpoint( .reduce { acc, iEndpointInfo -> acc intersect iEndpointInfo } } - /** - * Throws [UnsupportedOperationException] because [CombineEndpoint] does not have metrics. - * - * Call [IEndpoint.metrics] on each endpoint to get their metrics. - */ - override val metrics: Any - get() = throw UnsupportedOperationException("CombineEndpoint does not have metrics.") - private fun createNewStreamId(): Int { var i = 0 while (endpointsToStreamIdsMap.keys.any { it.second == i }) { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DummyEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DummyEndpoint.kt index 429b0a078..9d5b06d0a 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DummyEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DummyEndpoint.kt @@ -41,8 +41,6 @@ class DummyEndpoint : IEndpointInternal { TODO("Not yet implemented") } - override val metrics: Any - get() = TODO("Not yet implemented") override val throwableFlow: StateFlow = MutableStateFlow(null).asStateFlow() override suspend fun open(descriptor: MediaDescriptor) { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt index 1951fabc0..38d17798b 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt @@ -26,6 +26,7 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxer import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.ContentSink import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.FileSink import io.github.thibaultbee.streampack.core.elements.utils.ConflatedJob +import io.github.thibaultbee.streampack.core.elements.interfaces.WithMetrics import io.github.thibaultbee.streampack.core.logger.Logger import io.github.thibaultbee.streampack.core.pipelines.IDispatcherProvider import kotlinx.coroutines.CoroutineDispatcher @@ -48,7 +49,7 @@ open class DynamicEndpoint( private val context: Context, private val defaultDispatcher: CoroutineDispatcher, private val ioDispatcher: CoroutineDispatcher -) : IEndpointInternal { +) : IEndpointInternal, WithMetrics { private val coroutineScope = CoroutineScope(defaultDispatcher) private val mutex = Mutex() @@ -85,7 +86,11 @@ open class DynamicEndpoint( override fun getInfo(type: MediaDescriptor.Type) = getEndpoint(type).getInfo(type) override val metrics: Any - get() = endpoint?.metrics ?: throw IllegalStateException("Endpoint is not opened") + get() { + val endpoint = endpoint ?: throw IllegalStateException("Endpoint is not opened") + return (endpoint as? WithMetrics<*>)?.metrics + ?: throw UnsupportedOperationException("Current endpoint does not support metrics") + } init { coroutineScope.launch { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/IEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/IEndpoint.kt index 715c7d5c8..8f4fa80be 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/IEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/IEndpoint.kt @@ -170,9 +170,4 @@ interface IEndpoint { val supportedEncoders: List } } - - /** - * Metrics of the endpoint. - */ - val metrics: Any } \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/MediaMuxerEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/MediaMuxerEndpoint.kt index cedb2462a..0be8a776e 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/MediaMuxerEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/MediaMuxerEndpoint.kt @@ -64,9 +64,6 @@ class MediaMuxerEndpoint( override fun getInfo(type: MediaDescriptor.Type) = Companion.getInfo(type) - override val metrics: Any - get() = TODO("Not yet implemented") - private val _isOpenFlow = MutableStateFlow(false) override val isOpenFlow = _isOpenFlow.asStateFlow() diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/CompositeEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/CompositeEndpoint.kt index e85726a65..50e0850cf 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/CompositeEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/CompositeEndpoint.kt @@ -51,9 +51,6 @@ class CompositeEndpoint( override val info by lazy { EndpointInfo(muxer.info) } override fun getInfo(type: MediaDescriptor.Type) = info - override val metrics: Any - get() = sink.metrics - init { muxer.listener = object : IMuxerInternal.IMuxerListener { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/AbstractSink.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/AbstractSink.kt index 91ac63a37..de4c45efa 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/AbstractSink.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/AbstractSink.kt @@ -13,9 +13,6 @@ import io.github.thibaultbee.streampack.core.logger.Logger abstract class AbstractSink : ISinkInternal { abstract val supportedSinkTypes: List - override val metrics: Any - get() = TODO("Not yet implemented") - override suspend fun open(mediaDescriptor: MediaDescriptor) { if (isOpenFlow.value) { Logger.w(TAG, "Sink is already opened") diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/ISink.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/ISink.kt index 1e36c9cdd..76463db4a 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/ISink.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/ISink.kt @@ -45,9 +45,4 @@ interface ISink { * For example, if the file is opened for [FileSink]. */ val isOpenFlow: StateFlow - - /** - * Metrics of the sink. - */ - val metrics: Any } \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/interfaces/WithMetrics.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/interfaces/WithMetrics.kt new file mode 100644 index 000000000..1f9602754 --- /dev/null +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/interfaces/WithMetrics.kt @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2026 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.core.elements.interfaces + +interface WithMetrics { + /** + * Metrics of the element. + */ + val metrics: T +} diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/BitrateRegulator.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/BitrateRegulator.kt index ba7c86fb4..4ad6b4091 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/BitrateRegulator.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/BitrateRegulator.kt @@ -27,8 +27,8 @@ import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfi * @param onVideoTargetBitrateChange call when you have to change video bitrate * @param onAudioTargetBitrateChange call when you have to change audio bitrate */ -abstract class BitrateRegulator( +abstract class BitrateRegulator( protected val bitrateRegulatorConfig: BitrateRegulatorConfig, protected val onVideoTargetBitrateChange: ((Int) -> Unit), protected val onAudioTargetBitrateChange: ((Int) -> Unit) -) : IBitrateRegulator +) : IBitrateRegulator diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/IBitrateRegulator.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/IBitrateRegulator.kt index 4cd1a5a11..5fda2d569 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/IBitrateRegulator.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/IBitrateRegulator.kt @@ -20,7 +20,7 @@ import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfi /** * Interface to implement a bitrate regulator. */ -interface IBitrateRegulator { +interface IBitrateRegulator { /** * Calls regularly to get new metrics * @@ -28,14 +28,14 @@ interface IBitrateRegulator { * @param currentVideoBitrate current video bitrate target in bits/s. * @param currentAudioBitrate current audio bitrate target in bits/s. */ - fun update(metrics: Any, currentVideoBitrate: Int, currentAudioBitrate: Int) + fun update(metrics: T, currentVideoBitrate: Int, currentAudioBitrate: Int) /** * Factory interface you must use to create a [BitrateRegulator] object. * If you want to create a custom bitrate regulation implementation, create a factory that * implements this interface. */ - interface Factory { + interface Factory { /** * Creates a [BitrateRegulator] object from given parameters * @@ -48,6 +48,6 @@ interface IBitrateRegulator { bitrateRegulatorConfig: BitrateRegulatorConfig, onVideoTargetBitrateChange: ((Int) -> Unit), onAudioTargetBitrateChange: ((Int) -> Unit) - ): BitrateRegulator + ): BitrateRegulator } } \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorController.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorController.kt index 2ba195240..fab35bc86 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorController.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorController.kt @@ -17,7 +17,7 @@ package io.github.thibaultbee.streampack.core.regulator.controllers import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig import io.github.thibaultbee.streampack.core.elements.encoders.IEncoder -import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpoint +import io.github.thibaultbee.streampack.core.elements.interfaces.WithMetrics import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IEncodingPipelineOutput import io.github.thibaultbee.streampack.core.regulator.IBitrateRegulator import kotlinx.coroutines.CoroutineDispatcher @@ -27,15 +27,15 @@ import kotlinx.coroutines.CoroutineDispatcher * * @param audioEncoder the audio [IEncoder] * @param videoEncoder the video [IEncoder] - * @param endpoint the [IEndpoint] implementation + * @param endpoint the [WithMetrics] implementation * @param bitrateRegulatorFactory the [IBitrateRegulator.Factory] implementation. Use it to make your own bitrate regulator. * @param bitrateRegulatorConfig bitrate regulator configuration */ abstract class BitrateRegulatorController( private val audioEncoder: IEncoder?, private val videoEncoder: IEncoder?, - private val endpoint: IEndpoint, - private val bitrateRegulatorFactory: IBitrateRegulator.Factory, + private val endpoint: WithMetrics<*>, + private val bitrateRegulatorFactory: IBitrateRegulator.Factory<*>, private val bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig() ) : IBitrateRegulatorController { abstract class Factory : IBitrateRegulatorController.Factory { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/SimpleBitrateRegulatorController.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/IntervalBitrateRegulatorController.kt similarity index 87% rename from core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/SimpleBitrateRegulatorController.kt rename to core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/IntervalBitrateRegulatorController.kt index cb39e2d3d..006c9eda2 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/SimpleBitrateRegulatorController.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/IntervalBitrateRegulatorController.kt @@ -17,7 +17,7 @@ package io.github.thibaultbee.streampack.core.regulator.controllers import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig import io.github.thibaultbee.streampack.core.elements.encoders.IEncoder -import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpoint +import io.github.thibaultbee.streampack.core.elements.interfaces.WithMetrics import io.github.thibaultbee.streampack.core.elements.utils.CoroutineScheduler import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IConfigurableAudioEncodingPipelineOutput import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IConfigurableVideoEncodingPipelineOutput @@ -30,23 +30,23 @@ import kotlinx.coroutines.CoroutineDispatcher * * @param audioEncoder the audio [IEncoder] * @param videoEncoder the video [IEncoder] - * @param endpoint the [IEndpoint] implementation + * @param metricsProvider the [WithMetrics] implementation * @param bitrateRegulatorFactory the [IBitrateRegulator.Factory] implementation. Use it to make your own bitrate regulator. * @param bitrateRegulatorConfig bitrate regulator configuration * @param pollingTimeInMs delay between each call to [IBitrateRegulator.update] */ -open class SimpleBitrateRegulatorController( +open class IntervalBitrateRegulatorController( audioEncoder: IEncoder?, videoEncoder: IEncoder, - endpoint: IEndpoint, - bitrateRegulatorFactory: IBitrateRegulator.Factory, + metricsProvider: WithMetrics, + bitrateRegulatorFactory: IBitrateRegulator.Factory, coroutineDispatcher: CoroutineDispatcher, bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig(), pollingTimeInMs: Long = DEFAULT_POLLING_TIME_IN_MS ) : BitrateRegulatorController( audioEncoder, videoEncoder, - endpoint, + metricsProvider, bitrateRegulatorFactory, bitrateRegulatorConfig ) { @@ -66,7 +66,7 @@ open class SimpleBitrateRegulatorController( */ private val scheduler = CoroutineScheduler(pollingTimeInMs, coroutineDispatcher) { bitrateRegulator.update( - endpoint.metrics, + metricsProvider.metrics, videoEncoder.bitrate, audioEncoder?.bitrate ?: 0 ) @@ -84,8 +84,8 @@ open class SimpleBitrateRegulatorController( const val DEFAULT_POLLING_TIME_IN_MS = 500L } - class Factory( - private val bitrateRegulatorFactory: IBitrateRegulator.Factory, + class Factory( + private val bitrateRegulatorFactory: IBitrateRegulator.Factory, private val bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig(), private val pollingTimeInMs: Long = DEFAULT_POLLING_TIME_IN_MS ) : BitrateRegulatorController.Factory() { @@ -106,10 +106,12 @@ open class SimpleBitrateRegulatorController( } else { null } - return SimpleBitrateRegulatorController( + @Suppress("UNCHECKED_CAST") + val endpoint = pipelineOutput.endpoint as WithMetrics + return IntervalBitrateRegulatorController( audioEncoder, videoEncoder, - pipelineOutput.endpoint, + endpoint, bitrateRegulatorFactory, coroutineDispatcher, bitrateRegulatorConfig, diff --git a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/PreviewViewModel.kt b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/PreviewViewModel.kt index 35d331522..9043fc4e4 100644 --- a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/PreviewViewModel.kt +++ b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/PreviewViewModel.kt @@ -65,8 +65,8 @@ import io.github.thibaultbee.streampack.core.streamers.single.VideoOnlySingleStr import io.github.thibaultbee.streampack.core.streamers.single.withAudio import io.github.thibaultbee.streampack.core.streamers.single.withVideo import io.github.thibaultbee.streampack.core.utils.extensions.isClosedException -import io.github.thibaultbee.streampack.ext.rtmp.regulator.controllers.simpleRtmpBitrateRegulatorControllerFactory -import io.github.thibaultbee.streampack.ext.srt.regulator.controllers.simpleSrtBitrateRegulatorControllerFactory +import io.github.thibaultbee.streampack.ext.rtmp.regulator.controllers.intervalRtmpBitrateRegulatorControllerFactory +import io.github.thibaultbee.streampack.ext.srt.regulator.controllers.intervalSrtBitrateRegulatorControllerFactory import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job @@ -78,7 +78,6 @@ import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterNotNull import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.map -import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex @@ -322,13 +321,13 @@ class PreviewViewModel(private val application: Application) : ObservableViewMod val controllerFactory = when (descriptor.type.sinkType) { MediaSinkType.RTMP -> { - simpleRtmpBitrateRegulatorControllerFactory( + intervalRtmpBitrateRegulatorControllerFactory( bitrateRegulatorConfig = bitrateRegulatorConfig ) } MediaSinkType.SRT -> { - simpleSrtBitrateRegulatorControllerFactory( + intervalSrtBitrateRegulatorControllerFactory( bitrateRegulatorConfig = bitrateRegulatorConfig ) } diff --git a/extensions/flv/src/main/java/io/github/thibaultbee/streampack/ext/flv/elements/endpoints/FlvEndpoints.kt b/extensions/flv/src/main/java/io/github/thibaultbee/streampack/ext/flv/elements/endpoints/FlvEndpoints.kt index 661315379..99d68045a 100644 --- a/extensions/flv/src/main/java/io/github/thibaultbee/streampack/ext/flv/elements/endpoints/FlvEndpoints.kt +++ b/extensions/flv/src/main/java/io/github/thibaultbee/streampack/ext/flv/elements/endpoints/FlvEndpoints.kt @@ -63,9 +63,6 @@ sealed class FlvEndpoint( private var startUpTimestamp = INVALID_TIMESTAMP private val timestampMutex = Mutex() - override val metrics: Any - get() = TODO("Not yet implemented") - private val _isOpenFlow = MutableStateFlow(false) override val isOpenFlow: StateFlow = _isOpenFlow.asStateFlow() diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt index 89d4728d8..6cf519dc3 100644 --- a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt @@ -34,14 +34,11 @@ abstract class RtmpBitrateRegulator( bitrateRegulatorConfig: BitrateRegulatorConfig, onVideoTargetBitrateChange: ((Int) -> Unit), onAudioTargetBitrateChange: ((Int) -> Unit) -) : BitrateRegulator( +) : BitrateRegulator( bitrateRegulatorConfig, onVideoTargetBitrateChange, onAudioTargetBitrateChange ) { - override fun update(metrics: Any, currentVideoBitrate: Int, currentAudioBitrate: Int) = - update(metrics as RtmpMetrics, currentVideoBitrate, currentAudioBitrate) - /** * Call regularly to get new RTMP metrics * @@ -49,14 +46,18 @@ abstract class RtmpBitrateRegulator( * @param currentVideoBitrate current video bitrate target in bits/s. * @param currentAudioBitrate current audio bitrate target in bits/s. */ - abstract fun update(metrics: RtmpMetrics, currentVideoBitrate: Int, currentAudioBitrate: Int) + abstract override fun update( + metrics: RtmpMetrics, + currentVideoBitrate: Int, + currentAudioBitrate: Int + ) /** * Factory interface you must use to create a [RtmpBitrateRegulator] object. * If you want to create a custom RTMP bitrate regulation implementation, create a factory that * implements this interface. */ - interface Factory : IBitrateRegulator.Factory { + interface Factory : IBitrateRegulator.Factory { /** * Creates a [RtmpBitrateRegulator] object from given parameters * diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/controllers/RtmpBitrateRegulatorControllerFactories.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/controllers/RtmpBitrateRegulatorControllerFactories.kt index eb102122a..a02f58149 100644 --- a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/controllers/RtmpBitrateRegulatorControllerFactories.kt +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/controllers/RtmpBitrateRegulatorControllerFactories.kt @@ -16,25 +16,25 @@ package io.github.thibaultbee.streampack.ext.rtmp.regulator.controllers import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig -import io.github.thibaultbee.streampack.core.regulator.controllers.SimpleBitrateRegulatorController -import io.github.thibaultbee.streampack.core.regulator.controllers.SimpleBitrateRegulatorController.Companion.DEFAULT_POLLING_TIME_IN_MS +import io.github.thibaultbee.streampack.core.regulator.controllers.IntervalBitrateRegulatorController +import io.github.thibaultbee.streampack.core.regulator.controllers.IntervalBitrateRegulatorController.Companion.DEFAULT_POLLING_TIME_IN_MS import io.github.thibaultbee.streampack.ext.rtmp.regulator.RtmpBitrateRegulator import io.github.thibaultbee.streampack.ext.rtmp.regulator.SimpleRtmpBitrateRegulator /** - * A [SimpleBitrateRegulatorController.Factory] for [RtmpBitrateRegulator]. + * A [IntervalBitrateRegulatorController.Factory] for [RtmpBitrateRegulator]. * * @param bitrateRegulatorFactory the [RtmpBitrateRegulator.Factory] implementation. Use it to make your own bitrate regulator. * @param bitrateRegulatorConfig bitrate regulator configuration * @param pollingTimeInMs delay between each call to [RtmpBitrateRegulator.update] * - * @see SimpleBitrateRegulatorController.Factory + * @see IntervalBitrateRegulatorController.Factory * @see SimpleRtmpBitrateRegulator.Factory */ -fun simpleRtmpBitrateRegulatorControllerFactory( +fun intervalRtmpBitrateRegulatorControllerFactory( bitrateRegulatorFactory: RtmpBitrateRegulator.Factory = SimpleRtmpBitrateRegulator.Factory(), bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig(), pollingTimeInMs: Long = DEFAULT_POLLING_TIME_IN_MS -) = SimpleBitrateRegulatorController.Factory( +) = IntervalBitrateRegulatorController.Factory( bitrateRegulatorFactory, bitrateRegulatorConfig, pollingTimeInMs ) diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/composites/sinks/SrtSink.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/composites/sinks/SrtSink.kt index f5659ffee..cb1f2bfc6 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/composites/sinks/SrtSink.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/composites/sinks/SrtSink.kt @@ -30,13 +30,16 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.composites.data. import io.github.thibaultbee.streampack.core.elements.endpoints.composites.data.SrtPacket import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.AbstractSink import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.SinkConfiguration +import io.github.thibaultbee.streampack.core.elements.interfaces.WithMetrics import io.github.thibaultbee.streampack.core.logger.Logger import io.github.thibaultbee.streampack.ext.srt.configuration.mediadescriptor.SrtMediaDescriptor +import io.github.thibaultbee.streampack.ext.srt.utils.SrtStatsHelper import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asStateFlow -class SrtSink(private val coroutineDispatcher: CoroutineDispatcher) : AbstractSink() { +class SrtSink(private val coroutineDispatcher: CoroutineDispatcher) : AbstractSink(), + WithMetrics { override val supportedSinkTypes: List = listOf(MediaSinkType.SRT) private var socket: CoroutineSrtSocket? = null @@ -49,8 +52,7 @@ class SrtSink(private val coroutineDispatcher: CoroutineDispatcher) : AbstractSi * Get SRT stats */ override val metrics: Stats - get() = socket?.bistats(clear = true, instantaneous = true) - ?: throw IllegalStateException("Socket is not initialized") + get() = socket?.bistats(clear = true, instantaneous = true) ?: SrtStatsHelper.ZERO private val _isOpenFlow = MutableStateFlow(false) override val isOpenFlow = _isOpenFlow.asStateFlow() diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/SrtBitrateRegulator.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/SrtBitrateRegulator.kt index 6e5b4443c..7f7fa5d5c 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/SrtBitrateRegulator.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/SrtBitrateRegulator.kt @@ -34,14 +34,11 @@ abstract class SrtBitrateRegulator( bitrateRegulatorConfig: BitrateRegulatorConfig, onVideoTargetBitrateChange: ((Int) -> Unit), onAudioTargetBitrateChange: ((Int) -> Unit) -) : BitrateRegulator( +) : BitrateRegulator( bitrateRegulatorConfig, onVideoTargetBitrateChange, onAudioTargetBitrateChange ) { - override fun update(metrics: Any, currentVideoBitrate: Int, currentAudioBitrate: Int) = - update(metrics as Stats, currentVideoBitrate, currentAudioBitrate) - /** * Call regularly to get new SRT metrics * @@ -49,14 +46,14 @@ abstract class SrtBitrateRegulator( * @param currentVideoBitrate current video bitrate target in bits/s. * @param currentAudioBitrate current audio bitrate target in bits/s. */ - abstract fun update(metrics: Stats, currentVideoBitrate: Int, currentAudioBitrate: Int) + abstract override fun update(metrics: Stats, currentVideoBitrate: Int, currentAudioBitrate: Int) /** * Factory interface you must use to create a [SrtBitrateRegulator] object. * If you want to create a custom SRT bitrate regulation implementation, create a factory that * implements this interface. */ - interface Factory : IBitrateRegulator.Factory { + interface Factory : IBitrateRegulator.Factory { /** * Creates a [SrtBitrateRegulator] object from given parameters * diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/SrtBitrateRegulatorControllerFactories.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/SrtBitrateRegulatorControllerFactories.kt index 5d56a545a..c7537284d 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/SrtBitrateRegulatorControllerFactories.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/SrtBitrateRegulatorControllerFactories.kt @@ -16,26 +16,26 @@ package io.github.thibaultbee.streampack.ext.srt.regulator.controllers import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig -import io.github.thibaultbee.streampack.core.regulator.controllers.SimpleBitrateRegulatorController -import io.github.thibaultbee.streampack.core.regulator.controllers.SimpleBitrateRegulatorController.Companion.DEFAULT_POLLING_TIME_IN_MS +import io.github.thibaultbee.streampack.core.regulator.controllers.IntervalBitrateRegulatorController +import io.github.thibaultbee.streampack.core.regulator.controllers.IntervalBitrateRegulatorController.Companion.DEFAULT_POLLING_TIME_IN_MS import io.github.thibaultbee.streampack.ext.srt.regulator.DummySrtBitrateRegulator import io.github.thibaultbee.streampack.ext.srt.regulator.SrtBitrateRegulator /** - * A [SimpleBitrateRegulatorController.Factory] for [SrtBitrateRegulator]. + * A [IntervalBitrateRegulatorController.Factory] for [SrtBitrateRegulator]. * * @param bitrateRegulatorFactory the [SrtBitrateRegulator.Factory] implementation. Use it to make your own bitrate regulator. * @param bitrateRegulatorConfig bitrate regulator configuration * @param pollingTimeInMs delay between each call to [IBitrateRegulator.update] * - * @see SimpleBitrateRegulatorController.Factory + * @see IntervalBitrateRegulatorController.Factory * @see DummySrtBitrateRegulator.Factory */ -fun simpleSrtBitrateRegulatorControllerFactory( +fun intervalSrtBitrateRegulatorControllerFactory( bitrateRegulatorFactory: SrtBitrateRegulator.Factory = DummySrtBitrateRegulator.Factory(), bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig(), pollingTimeInMs: Long = DEFAULT_POLLING_TIME_IN_MS -) = SimpleBitrateRegulatorController.Factory( +) = IntervalBitrateRegulatorController.Factory( bitrateRegulatorFactory, bitrateRegulatorConfig, pollingTimeInMs diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/utils/SrtStatsHelper.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/utils/SrtStatsHelper.kt new file mode 100644 index 000000000..e337b3575 --- /dev/null +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/utils/SrtStatsHelper.kt @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2026 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.ext.srt.utils + +import io.github.thibaultbee.srtdroid.core.models.Stats + +object SrtStatsHelper { + val ZERO = Stats( + msTimeStamp = 0L, + pktSentTotal = 0L, + pktRecvTotal = 0L, + pktSndLossTotal = 0, + pktRcvLossTotal = 0, + pktRetransTotal = 0, + pktSentACKTotal = 0, + pktRecvACKTotal = 0, + pktSentNAKTotal = 0, + pktRecvNAKTotal = 0, + usSndDurationTotal = 0, + pktSndDropTotal = 0, + pktRcvDropTotal = 0, + pktRcvUndecryptTotal = 0, + byteSentTotal = 0, + byteRecvTotal = 0, + byteRcvLossTotal = 0, + byteRetransTotal = 0, + byteSndDropTotal = 0, + byteRcvDropTotal = 0, + byteRcvUndecryptTotal = 0, + pktSent = 0, + pktRecv = 0, + pktSndLoss = 0, + pktRcvLoss = 0, + pktRetrans = 0, + pktRcvRetrans = 0, + pktSentACK = 0, + pktRecvACK = 0, + pktSentNAK = 0, + pktRecvNAK = 0, + mbpsSendRate = 0.0, + mbpsRecvRate = 0.0, + usSndDuration = 0, + pktReorderDistance = 0, + pktRcvAvgBelatedTime = 0.0, + pktRcvBelated = 0, + pktSndDrop = 0, + pktRcvDrop = 0, + pktRcvUndecrypt = 0, + byteSent = 0, + byteRecv = 0, + byteRcvLoss = 0, + byteRetrans = 0, + byteSndDrop = 0, + byteRcvDrop = 0, + byteRcvUndecrypt = 0, + usPktSndPeriod = 0.0, + pktFlowWindow = 0, + pktCongestionWindow = 0, + pktFlightSize = 0, + msRTT = 0.0, + mbpsBandwidth = 0.0, + byteAvailSndBuf = 0, + byteAvailRcvBuf = 0, + mbpsMaxBW = 0.0, + byteMSS = 0, + pktSndBuf = 0, + byteSndBuf = 0, + msSndBuf = 0, + msSndTsbPdDelay = 0, + pktRcvBuf = 0, + byteRcvBuf = 0, + msRcvBuf = 0, + msRcvTsbPdDelay = 0, + pktSndFilterExtraTotal = 0, + pktRcvFilterExtraTotal = 0, + pktRcvFilterSupplyTotal = 0, + pktRcvFilterLossTotal = 0, + pktSndFilterExtra = 0, + pktRcvFilterExtra = 0, + pktRcvFilterSupply = 0, + pktRcvFilterLoss = 0, + pktReorderTolerance = 0, + pktSentUniqueTotal = 0, + pktRecvUniqueTotal = 0, + byteSentUniqueTotal = 0, + byteRecvUniqueTotal = 0, + pktSentUnique = 0, + pktRecvUnique = 0, + byteSentUnique = 0, + byteRecvUnique = 0 + ) +} + + From 13e49e5a77502fd015c0dbd5d7ac2c1d581db5b4 Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Tue, 19 May 2026 08:21:07 +0200 Subject: [PATCH 4/5] fix(*): fallback to `Any` (instead of generics) for the dynamic endpoint --- .../elements/endpoints/DynamicEndpoint.kt | 4 +-- .../core/elements/interfaces/WithMetrics.kt | 4 +-- .../core/regulator/BitrateRegulator.kt | 4 +-- .../core/regulator/IBitrateRegulator.kt | 8 +++--- .../controllers/BitrateRegulatorController.kt | 4 +-- .../IntervalBitrateRegulatorController.kt | 13 +++++----- .../rtmp/elements/endpoints/RtmpEndpoint.kt | 2 +- .../rtmp/regulator/RtmpBitrateRegulator.kt | 26 ++++++++++++++++--- .../endpoints/composites/sinks/SrtSink.kt | 5 +++- .../ext/srt/regulator/SrtBitrateRegulator.kt | 26 ++++++++++++++++--- 10 files changed, 69 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt index 38d17798b..2ba48e04d 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt @@ -49,7 +49,7 @@ open class DynamicEndpoint( private val context: Context, private val defaultDispatcher: CoroutineDispatcher, private val ioDispatcher: CoroutineDispatcher -) : IEndpointInternal, WithMetrics { +) : IEndpointInternal, WithMetrics { private val coroutineScope = CoroutineScope(defaultDispatcher) private val mutex = Mutex() @@ -88,7 +88,7 @@ open class DynamicEndpoint( override val metrics: Any get() { val endpoint = endpoint ?: throw IllegalStateException("Endpoint is not opened") - return (endpoint as? WithMetrics<*>)?.metrics + return (endpoint as? WithMetrics)?.metrics ?: throw UnsupportedOperationException("Current endpoint does not support metrics") } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/interfaces/WithMetrics.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/interfaces/WithMetrics.kt index 1f9602754..8bc07f239 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/interfaces/WithMetrics.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/interfaces/WithMetrics.kt @@ -15,9 +15,9 @@ */ package io.github.thibaultbee.streampack.core.elements.interfaces -interface WithMetrics { +interface WithMetrics { /** * Metrics of the element. */ - val metrics: T + val metrics: Any } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/BitrateRegulator.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/BitrateRegulator.kt index 4ad6b4091..ba7c86fb4 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/BitrateRegulator.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/BitrateRegulator.kt @@ -27,8 +27,8 @@ import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfi * @param onVideoTargetBitrateChange call when you have to change video bitrate * @param onAudioTargetBitrateChange call when you have to change audio bitrate */ -abstract class BitrateRegulator( +abstract class BitrateRegulator( protected val bitrateRegulatorConfig: BitrateRegulatorConfig, protected val onVideoTargetBitrateChange: ((Int) -> Unit), protected val onAudioTargetBitrateChange: ((Int) -> Unit) -) : IBitrateRegulator +) : IBitrateRegulator diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/IBitrateRegulator.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/IBitrateRegulator.kt index 5fda2d569..4cd1a5a11 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/IBitrateRegulator.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/IBitrateRegulator.kt @@ -20,7 +20,7 @@ import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfi /** * Interface to implement a bitrate regulator. */ -interface IBitrateRegulator { +interface IBitrateRegulator { /** * Calls regularly to get new metrics * @@ -28,14 +28,14 @@ interface IBitrateRegulator { * @param currentVideoBitrate current video bitrate target in bits/s. * @param currentAudioBitrate current audio bitrate target in bits/s. */ - fun update(metrics: T, currentVideoBitrate: Int, currentAudioBitrate: Int) + fun update(metrics: Any, currentVideoBitrate: Int, currentAudioBitrate: Int) /** * Factory interface you must use to create a [BitrateRegulator] object. * If you want to create a custom bitrate regulation implementation, create a factory that * implements this interface. */ - interface Factory { + interface Factory { /** * Creates a [BitrateRegulator] object from given parameters * @@ -48,6 +48,6 @@ interface IBitrateRegulator { bitrateRegulatorConfig: BitrateRegulatorConfig, onVideoTargetBitrateChange: ((Int) -> Unit), onAudioTargetBitrateChange: ((Int) -> Unit) - ): BitrateRegulator + ): BitrateRegulator } } \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorController.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorController.kt index fab35bc86..a2802f108 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorController.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorController.kt @@ -34,8 +34,8 @@ import kotlinx.coroutines.CoroutineDispatcher abstract class BitrateRegulatorController( private val audioEncoder: IEncoder?, private val videoEncoder: IEncoder?, - private val endpoint: WithMetrics<*>, - private val bitrateRegulatorFactory: IBitrateRegulator.Factory<*>, + private val endpoint: WithMetrics, + private val bitrateRegulatorFactory: IBitrateRegulator.Factory, private val bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig() ) : IBitrateRegulatorController { abstract class Factory : IBitrateRegulatorController.Factory { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/IntervalBitrateRegulatorController.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/IntervalBitrateRegulatorController.kt index 006c9eda2..3a7c675b1 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/IntervalBitrateRegulatorController.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/IntervalBitrateRegulatorController.kt @@ -35,11 +35,11 @@ import kotlinx.coroutines.CoroutineDispatcher * @param bitrateRegulatorConfig bitrate regulator configuration * @param pollingTimeInMs delay between each call to [IBitrateRegulator.update] */ -open class IntervalBitrateRegulatorController( +open class IntervalBitrateRegulatorController( audioEncoder: IEncoder?, videoEncoder: IEncoder, - metricsProvider: WithMetrics, - bitrateRegulatorFactory: IBitrateRegulator.Factory, + metricsProvider: WithMetrics, + bitrateRegulatorFactory: IBitrateRegulator.Factory, coroutineDispatcher: CoroutineDispatcher, bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig(), pollingTimeInMs: Long = DEFAULT_POLLING_TIME_IN_MS @@ -84,8 +84,8 @@ open class IntervalBitrateRegulatorController( const val DEFAULT_POLLING_TIME_IN_MS = 500L } - class Factory( - private val bitrateRegulatorFactory: IBitrateRegulator.Factory, + class Factory( + private val bitrateRegulatorFactory: IBitrateRegulator.Factory, private val bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig(), private val pollingTimeInMs: Long = DEFAULT_POLLING_TIME_IN_MS ) : BitrateRegulatorController.Factory() { @@ -106,8 +106,7 @@ open class IntervalBitrateRegulatorController( } else { null } - @Suppress("UNCHECKED_CAST") - val endpoint = pipelineOutput.endpoint as WithMetrics + val endpoint = pipelineOutput.endpoint as WithMetrics return IntervalBitrateRegulatorController( audioEncoder, videoEncoder, diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/elements/endpoints/RtmpEndpoint.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/elements/endpoints/RtmpEndpoint.kt index 0a61556f3..07bf9360c 100644 --- a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/elements/endpoints/RtmpEndpoint.kt +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/elements/endpoints/RtmpEndpoint.kt @@ -64,7 +64,7 @@ import kotlin.concurrent.atomics.ExperimentalAtomicApi @OptIn(ExperimentalAtomicApi::class) class RtmpEndpoint internal constructor( defaultDispatcher: CoroutineDispatcher, val ioDispatcher: CoroutineDispatcher -) : IEndpointInternal, WithMetrics { +) : IEndpointInternal, WithMetrics { private val coroutineScope = CoroutineScope(SupervisorJob() + defaultDispatcher) private val mutex = Mutex() diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt index 6cf519dc3..a1515745f 100644 --- a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt @@ -17,6 +17,7 @@ package io.github.thibaultbee.streampack.ext.rtmp.regulator import io.github.komedia.komuxer.rtmp.util.metrics.RtmpMetrics import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig +import io.github.thibaultbee.streampack.core.logger.Logger import io.github.thibaultbee.streampack.core.regulator.BitrateRegulator import io.github.thibaultbee.streampack.core.regulator.IBitrateRegulator @@ -34,7 +35,7 @@ abstract class RtmpBitrateRegulator( bitrateRegulatorConfig: BitrateRegulatorConfig, onVideoTargetBitrateChange: ((Int) -> Unit), onAudioTargetBitrateChange: ((Int) -> Unit) -) : BitrateRegulator( +) : BitrateRegulator( bitrateRegulatorConfig, onVideoTargetBitrateChange, onAudioTargetBitrateChange @@ -46,18 +47,37 @@ abstract class RtmpBitrateRegulator( * @param currentVideoBitrate current video bitrate target in bits/s. * @param currentAudioBitrate current audio bitrate target in bits/s. */ - abstract override fun update( + override fun update(metrics: Any, currentVideoBitrate: Int, currentAudioBitrate: Int) { + if (metrics !is RtmpMetrics) { + Logger.w(TAG, "Expect RtmpMetrics but got ${metrics::class.java.simpleName}") + return + } + update(metrics, currentVideoBitrate, currentAudioBitrate) + } + + /** + * Call regularly to get new RTMP metrics + * + * @param metrics RTMP transmission metrics + * @param currentVideoBitrate current video bitrate target in bits/s. + * @param currentAudioBitrate current audio bitrate target in bits/s. + */ + abstract fun update( metrics: RtmpMetrics, currentVideoBitrate: Int, currentAudioBitrate: Int ) + + companion object { + private const val TAG = "RtmpBitrateRegulator" + } /** * Factory interface you must use to create a [RtmpBitrateRegulator] object. * If you want to create a custom RTMP bitrate regulation implementation, create a factory that * implements this interface. */ - interface Factory : IBitrateRegulator.Factory { + interface Factory : IBitrateRegulator.Factory { /** * Creates a [RtmpBitrateRegulator] object from given parameters * diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/composites/sinks/SrtSink.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/composites/sinks/SrtSink.kt index cb1f2bfc6..ca8a9d17c 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/composites/sinks/SrtSink.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/composites/sinks/SrtSink.kt @@ -39,7 +39,7 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asStateFlow class SrtSink(private val coroutineDispatcher: CoroutineDispatcher) : AbstractSink(), - WithMetrics { + WithMetrics { override val supportedSinkTypes: List = listOf(MediaSinkType.SRT) private var socket: CoroutineSrtSocket? = null @@ -57,6 +57,9 @@ class SrtSink(private val coroutineDispatcher: CoroutineDispatcher) : AbstractSi private val _isOpenFlow = MutableStateFlow(false) override val isOpenFlow = _isOpenFlow.asStateFlow() + fun bitstats(clear: Boolean, instantaneous: Boolean) = + socket?.bistats(clear = clear, instantaneous = instantaneous) ?: SrtStatsHelper.ZERO + override fun configure(config: SinkConfiguration) { bitrate = config.streamConfigs.sumOf { it.startBitrate.toLong() } } diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/SrtBitrateRegulator.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/SrtBitrateRegulator.kt index 7f7fa5d5c..a0124646c 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/SrtBitrateRegulator.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/SrtBitrateRegulator.kt @@ -17,6 +17,7 @@ package io.github.thibaultbee.streampack.ext.srt.regulator import io.github.thibaultbee.srtdroid.core.models.Stats import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig +import io.github.thibaultbee.streampack.core.logger.Logger import io.github.thibaultbee.streampack.core.regulator.BitrateRegulator import io.github.thibaultbee.streampack.core.regulator.IBitrateRegulator @@ -34,7 +35,7 @@ abstract class SrtBitrateRegulator( bitrateRegulatorConfig: BitrateRegulatorConfig, onVideoTargetBitrateChange: ((Int) -> Unit), onAudioTargetBitrateChange: ((Int) -> Unit) -) : BitrateRegulator( +) : BitrateRegulator( bitrateRegulatorConfig, onVideoTargetBitrateChange, onAudioTargetBitrateChange @@ -46,14 +47,33 @@ abstract class SrtBitrateRegulator( * @param currentVideoBitrate current video bitrate target in bits/s. * @param currentAudioBitrate current audio bitrate target in bits/s. */ - abstract override fun update(metrics: Stats, currentVideoBitrate: Int, currentAudioBitrate: Int) + override fun update(metrics: Any, currentVideoBitrate: Int, currentAudioBitrate: Int) { + if (metrics !is Stats) { + Logger.w(TAG, "Expect Stats but got ${metrics::class.java.simpleName}") + return + } + update(metrics, currentVideoBitrate, currentAudioBitrate) + } + + /** + * Call regularly to get new SRT metrics + * + * @param metrics SRT transmission metrics + * @param currentVideoBitrate current video bitrate target in bits/s. + * @param currentAudioBitrate current audio bitrate target in bits/s. + */ + abstract fun update(metrics: Stats, currentVideoBitrate: Int, currentAudioBitrate: Int) + + companion object { + private const val TAG = "SrtBitrateRegulator" + } /** * Factory interface you must use to create a [SrtBitrateRegulator] object. * If you want to create a custom SRT bitrate regulation implementation, create a factory that * implements this interface. */ - interface Factory : IBitrateRegulator.Factory { + interface Factory : IBitrateRegulator.Factory { /** * Creates a [SrtBitrateRegulator] object from given parameters * From 1e6011afe74821061d90745ae5d144899e6f9752 Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Tue, 9 Jun 2026 21:53:46 +0200 Subject: [PATCH 5/5] tmp --- .../elements/endpoints/DynamicEndpoint.kt | 9 +- .../core/elements/interfaces/WithMetrics.kt | 61 ++++++ .../sources/video/camera/CameraSettings.kt | 22 +- .../elements/utils/extensions/Extensions.kt | 14 +- .../IntervalBitrateRegulatorController.kt | 7 +- .../rtmp/elements/endpoints/RtmpEndpoint.kt | 22 +- .../rtmp/regulator/RtmpBitrateRegulator.kt | 5 +- .../regulator/SimpleRtmpBitrateRegulator.kt | 23 +- .../endpoints/composites/sinks/SrtSink.kt | 13 +- .../srt/regulator/DummySrtBitrateRegulator.kt | 14 +- .../streampack/ext/srt/utils/SrtMetrics.kt | 196 ++++++++++++++++++ 11 files changed, 325 insertions(+), 61 deletions(-) create mode 100644 extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/utils/SrtMetrics.kt diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt index 2ba48e04d..65b839fc0 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt @@ -25,8 +25,9 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxer import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.data.TSServiceInfo import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.ContentSink import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.FileSink +import io.github.thibaultbee.streampack.core.elements.interfaces.EndpointMetrics +import io.github.thibaultbee.streampack.core.elements.interfaces.WithEndpointMetrics import io.github.thibaultbee.streampack.core.elements.utils.ConflatedJob -import io.github.thibaultbee.streampack.core.elements.interfaces.WithMetrics import io.github.thibaultbee.streampack.core.logger.Logger import io.github.thibaultbee.streampack.core.pipelines.IDispatcherProvider import kotlinx.coroutines.CoroutineDispatcher @@ -49,7 +50,7 @@ open class DynamicEndpoint( private val context: Context, private val defaultDispatcher: CoroutineDispatcher, private val ioDispatcher: CoroutineDispatcher -) : IEndpointInternal, WithMetrics { +) : IEndpointInternal, WithEndpointMetrics { private val coroutineScope = CoroutineScope(defaultDispatcher) private val mutex = Mutex() @@ -85,10 +86,10 @@ open class DynamicEndpoint( override fun getInfo(type: MediaDescriptor.Type) = getEndpoint(type).getInfo(type) - override val metrics: Any + override val metrics: EndpointMetrics<*> get() { val endpoint = endpoint ?: throw IllegalStateException("Endpoint is not opened") - return (endpoint as? WithMetrics)?.metrics + return (endpoint as? WithEndpointMetrics)?.metrics ?: throw UnsupportedOperationException("Current endpoint does not support metrics") } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/interfaces/WithMetrics.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/interfaces/WithMetrics.kt index 8bc07f239..51e4d35ff 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/interfaces/WithMetrics.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/interfaces/WithMetrics.kt @@ -15,9 +15,70 @@ */ package io.github.thibaultbee.streampack.core.elements.interfaces +import kotlin.time.Duration + +/** + * Minimal metrics interface + */ interface WithMetrics { /** * Metrics of the element. */ val metrics: Any } + +/** + * Endpoint metrics interface + */ +interface EndpointMetrics { + /** + * The duration of the interval + */ + val uptime: Duration + + /** + * The number of packets sent. + */ + val packetsSent: Long + + /** + * The number of packets dropped before sending (e.g. due to congestion or timeout). + */ + val packetsSendDropped: Long + + /** + * The number of packets lost during the transmission. + */ + val packetsSendLost: Long + + /** + * The number of bytes successfully sent. + */ + val bytesSent: Long + + /** + * The number of bytes dropped before sending (e.g. due to congestion or timeout). + */ + val bytesSendDropped: Long + + /** + * The protocol-specific metrics wrapper. + */ + val rawMetrics: T +} + +/** + * The total sent bitrate in bits per second (bps). + */ +val EndpointMetrics<*>.totalSentBitrateInBps: Long + get() = uptime.inWholeMilliseconds.let { if (it == 0L) 0L else (bytesSent * 8000) / it } + +/** + * A specific [WithMetrics] for [EndpointMetrics]. + */ +interface WithEndpointMetrics : WithMetrics { + /** + * Metrics of the element + */ + override val metrics: EndpointMetrics<*> +} \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/sources/video/camera/CameraSettings.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/sources/video/camera/CameraSettings.kt index a1392623a..c6c85a4c1 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/sources/video/camera/CameraSettings.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/sources/video/camera/CameraSettings.kt @@ -56,7 +56,7 @@ import io.github.thibaultbee.streampack.core.elements.sources.video.camera.exten import io.github.thibaultbee.streampack.core.elements.sources.video.camera.extensions.sensitivityRange import io.github.thibaultbee.streampack.core.elements.sources.video.camera.extensions.zoomRatioRange import io.github.thibaultbee.streampack.core.elements.sources.video.camera.utils.CaptureResultListener -import io.github.thibaultbee.streampack.core.elements.utils.extensions.clamp +import io.github.thibaultbee.streampack.core.elements.utils.extensions.coerceIn import io.github.thibaultbee.streampack.core.elements.utils.extensions.isApplicationPortrait import io.github.thibaultbee.streampack.core.elements.utils.extensions.isNormalized import io.github.thibaultbee.streampack.core.elements.utils.extensions.launchIn @@ -533,7 +533,7 @@ class CameraSettings internal constructor( suspend fun setSensorSensitivity(sensorSensitivity: Int) { cameraSettings.set( CaptureRequest.SENSOR_SENSITIVITY, - sensorSensitivity.clamp(availableSensorSensitivityRange) + sensorSensitivity.coerceIn(availableSensorSensitivityRange) ) cameraSettings.applyRepeatingSession() } @@ -693,7 +693,7 @@ class CameraSettings internal constructor( suspend fun setCompensation(compensation: Int) { cameraSettings.set( CaptureRequest.CONTROL_AE_EXPOSURE_COMPENSATION, - compensation.clamp(availableCompensationRange) + compensation.coerceIn(availableCompensationRange) ) cameraSettings.applyRepeatingSession() } @@ -777,7 +777,7 @@ class CameraSettings internal constructor( suspend fun onPinch(scale: Float) { val scaledRatio: Float = getZoomRatio() * speedUpZoomByX(scale, 2) // Clamp the ratio with the zoom range. - setZoomRatio(scaledRatio.clamp(availableRatioRange.lower, availableRatioRange.upper)) + setZoomRatio(scaledRatio.coerceIn(availableRatioRange.lower, availableRatioRange.upper)) } private fun speedUpZoomByX(scaleFactor: Float, ratio: Int): Float { @@ -825,7 +825,7 @@ class CameraSettings internal constructor( @RequiresPermission(Manifest.permission.CAMERA) override suspend fun setZoomRatio(zoomRatio: Float) { mutex.withLock { - val clampedValue = zoomRatio.clamp(availableRatioRange) + val clampedValue = zoomRatio.coerceIn(availableRatioRange) if (clampedValue == persistentZoomRatio) { return@withLock } @@ -905,7 +905,7 @@ class CameraSettings internal constructor( return } cameraSettings.set( - CaptureRequest.CONTROL_ZOOM_RATIO, zoomRatio.clamp(availableRatioRange) + CaptureRequest.CONTROL_ZOOM_RATIO, zoomRatio.coerceIn(availableRatioRange) ) cameraSettings.applyRepeatingSession() notifyZoomListeners(zoomRatio) @@ -1021,7 +1021,7 @@ class CameraSettings internal constructor( @RequiresPermission(Manifest.permission.CAMERA) suspend fun setLensDistance(lensDistance: Float) { cameraSettings.set( - CaptureRequest.LENS_FOCUS_DISTANCE, lensDistance.clamp(availableLensDistanceRange) + CaptureRequest.LENS_FOCUS_DISTANCE, lensDistance.coerceIn(availableLensDistanceRange) ) cameraSettings.applyRepeatingSession() } @@ -1618,10 +1618,10 @@ class CameraSettings internal constructor( (centerY + height / 2).toInt() ) - focusRect.left = focusRect.left.clamp(cropRegion.right, cropRegion.left) - focusRect.right = focusRect.right.clamp(cropRegion.right, cropRegion.left) - focusRect.top = focusRect.top.clamp(cropRegion.bottom, cropRegion.top) - focusRect.bottom = focusRect.bottom.clamp(cropRegion.bottom, cropRegion.top) + focusRect.left = focusRect.left.coerceIn(cropRegion.right, cropRegion.left) + focusRect.right = focusRect.right.coerceIn(cropRegion.right, cropRegion.left) + focusRect.top = focusRect.top.coerceIn(cropRegion.bottom, cropRegion.top) + focusRect.bottom = focusRect.bottom.coerceIn(cropRegion.bottom, cropRegion.top) return MeteringRectangle(focusRect, DEFAULT_METERING_WEIGHT_MAX) } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/Extensions.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/Extensions.kt index 5d5fba851..9f59bc79a 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/Extensions.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/Extensions.kt @@ -43,16 +43,16 @@ internal fun Any.numOfBits(): Int { } } -internal fun > T.clamp(min: T, max: T): T { - return if (max >= min) { - if (this < min) min else if (this > max) max else this - } else { - if (this < max) max else if (this > min) min else this +internal fun > T.coerceIn(min: T, max: T): T { + return when { + this < min -> min + this > max -> max + else -> this } } -internal fun > T.clamp(range: Range) = - this.clamp(range.lower, range.upper) +internal fun > T.coerceIn(range: Range) = + this.coerceIn(range.lower, range.upper) internal val PointF.isNormalized: Boolean get() = x in 0f..1f && y in 0f..1f diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/IntervalBitrateRegulatorController.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/IntervalBitrateRegulatorController.kt index 3a7c675b1..0f39d2e45 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/IntervalBitrateRegulatorController.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/IntervalBitrateRegulatorController.kt @@ -19,6 +19,7 @@ import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfi import io.github.thibaultbee.streampack.core.elements.encoders.IEncoder import io.github.thibaultbee.streampack.core.elements.interfaces.WithMetrics import io.github.thibaultbee.streampack.core.elements.utils.CoroutineScheduler +import io.github.thibaultbee.streampack.core.elements.utils.extensions.coerceIn import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IConfigurableAudioEncodingPipelineOutput import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IConfigurableVideoEncodingPipelineOutput import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IEncodingPipelineOutput @@ -55,10 +56,10 @@ open class IntervalBitrateRegulatorController( */ private val bitrateRegulator = bitrateRegulatorFactory.newBitrateRegulator( bitrateRegulatorConfig, - { - videoEncoder.bitrate = it + onVideoTargetBitrateChange = { + videoEncoder.bitrate = it.coerceIn(bitrateRegulatorConfig.videoBitrateRange) }, - { /* Do nothing for audio */ } + onAudioTargetBitrateChange = { /* Do nothing for audio */ } ) /** diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/elements/endpoints/RtmpEndpoint.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/elements/endpoints/RtmpEndpoint.kt index 07bf9360c..9d235b398 100644 --- a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/elements/endpoints/RtmpEndpoint.kt +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/elements/endpoints/RtmpEndpoint.kt @@ -32,7 +32,8 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.ClosedException import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpoint import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpointInternal import io.github.thibaultbee.streampack.core.elements.endpoints.composites.CompositeEndpoint.EndpointInfo -import io.github.thibaultbee.streampack.core.elements.interfaces.WithMetrics +import io.github.thibaultbee.streampack.core.elements.interfaces.EndpointMetrics +import io.github.thibaultbee.streampack.core.elements.interfaces.WithEndpointMetrics import io.github.thibaultbee.streampack.core.elements.utils.ChannelWithCloseableData import io.github.thibaultbee.streampack.core.elements.utils.useConsumeEach import io.github.thibaultbee.streampack.core.logger.Logger @@ -57,6 +58,7 @@ import kotlinx.coroutines.withContext import kotlinx.io.EOFException import java.io.IOException import kotlin.concurrent.atomics.ExperimentalAtomicApi +import kotlin.time.Duration /** * An endpoint that send frame to an RTMP server. @@ -64,7 +66,7 @@ import kotlin.concurrent.atomics.ExperimentalAtomicApi @OptIn(ExperimentalAtomicApi::class) class RtmpEndpoint internal constructor( defaultDispatcher: CoroutineDispatcher, val ioDispatcher: CoroutineDispatcher -) : IEndpointInternal, WithMetrics { +) : IEndpointInternal, WithEndpointMetrics { private val coroutineScope = CoroutineScope(SupervisorJob() + defaultDispatcher) private val mutex = Mutex() @@ -103,7 +105,7 @@ class RtmpEndpoint internal constructor( private var startUpTimestamp = INVALID_TIMESTAMP private val timestampMutex = Mutex() - override val metrics: RtmpMetrics + private val syncMetrics: RtmpMetrics get() { val metrics = rtmpClient?.metrics ?: return RtmpMetrics.ZERO @@ -119,6 +121,20 @@ class RtmpEndpoint internal constructor( } } + override val metrics: EndpointMetrics + get() { + val metrics = syncMetrics + return object : EndpointMetrics { + override val uptime: Duration = metrics.uptime + override val packetsSent = metrics.messagesSent + override val packetsSendDropped = metrics.messagesSendDropped + override val packetsSendLost = 0L + override val bytesSent = metrics.totalBytesSent + override val bytesSendDropped = metrics.payloadSendDroppedSize + override val rawMetrics: RtmpMetrics = metrics + } + } + private val _isOpenFlow = MutableStateFlow(false) override val isOpenFlow = _isOpenFlow.asStateFlow() diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt index a1515745f..96ce9491d 100644 --- a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt @@ -17,6 +17,7 @@ package io.github.thibaultbee.streampack.ext.rtmp.regulator import io.github.komedia.komuxer.rtmp.util.metrics.RtmpMetrics import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig +import io.github.thibaultbee.streampack.core.elements.interfaces.EndpointMetrics import io.github.thibaultbee.streampack.core.logger.Logger import io.github.thibaultbee.streampack.core.regulator.BitrateRegulator import io.github.thibaultbee.streampack.core.regulator.IBitrateRegulator @@ -63,11 +64,11 @@ abstract class RtmpBitrateRegulator( * @param currentAudioBitrate current audio bitrate target in bits/s. */ abstract fun update( - metrics: RtmpMetrics, + metrics: EndpointMetrics, currentVideoBitrate: Int, currentAudioBitrate: Int ) - + companion object { private const val TAG = "RtmpBitrateRegulator" } diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/SimpleRtmpBitrateRegulator.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/SimpleRtmpBitrateRegulator.kt index de1b248d5..dad0013d1 100644 --- a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/SimpleRtmpBitrateRegulator.kt +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/SimpleRtmpBitrateRegulator.kt @@ -17,6 +17,7 @@ package io.github.thibaultbee.streampack.ext.rtmp.regulator import io.github.komedia.komuxer.rtmp.util.metrics.RtmpMetrics import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig +import io.github.thibaultbee.streampack.core.elements.interfaces.EndpointMetrics import io.github.thibaultbee.streampack.core.regulator.BitrateRegulator import kotlin.math.max import kotlin.math.min @@ -49,31 +50,25 @@ class SimpleRtmpBitrateRegulator( * @param currentVideoBitrate current video bitrate target in bits/s. * @param currentAudioBitrate current audio bitrate target in bits/s. */ - override fun update(metrics: RtmpMetrics, currentVideoBitrate: Int, currentAudioBitrate: Int) { - if (metrics.messagesSendDropped > 0) { + override fun update( + metrics: EndpointMetrics, + currentVideoBitrate: Int, + currentAudioBitrate: Int + ) { + if (metrics.rawMetrics.messagesSendDropped > 0) { // Detected packet loss - quickly react val newVideoBitrate = currentVideoBitrate - max( currentVideoBitrate * 20 / 100, // too late - drop bitrate by 20 % MINIMUM_DECREASE_THRESHOLD // getting down by 100000 b/s minimum ) - onVideoTargetBitrateChange( - max( - newVideoBitrate, - bitrateRegulatorConfig.videoBitrateRange.lower - ) - ) + onVideoTargetBitrateChange(newVideoBitrate) } else if (currentVideoBitrate < bitrateRegulatorConfig.videoBitrateRange.upper) { // Try to increase to the max target val newVideoBitrate = currentVideoBitrate + min( (bitrateRegulatorConfig.videoBitrateRange.upper - currentVideoBitrate) * 50 / 100, // getting slower when reaching target bitrate MAXIMUM_INCREASE_THRESHOLD // not increasing to fast ) - onVideoTargetBitrateChange( - min( - newVideoBitrate, - bitrateRegulatorConfig.videoBitrateRange.upper - ) - ) + onVideoTargetBitrateChange(newVideoBitrate) } } diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/composites/sinks/SrtSink.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/composites/sinks/SrtSink.kt index ca8a9d17c..6be24a284 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/composites/sinks/SrtSink.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/composites/sinks/SrtSink.kt @@ -20,7 +20,6 @@ import io.github.thibaultbee.srtdroid.core.enums.SockOpt import io.github.thibaultbee.srtdroid.core.enums.Transtype import io.github.thibaultbee.srtdroid.core.models.MsgCtrl import io.github.thibaultbee.srtdroid.core.models.SrtUrl.Mode -import io.github.thibaultbee.srtdroid.core.models.Stats import io.github.thibaultbee.srtdroid.ktx.CoroutineSrtSocket import io.github.thibaultbee.srtdroid.ktx.connect import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor @@ -30,16 +29,18 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.composites.data. import io.github.thibaultbee.streampack.core.elements.endpoints.composites.data.SrtPacket import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.AbstractSink import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.SinkConfiguration -import io.github.thibaultbee.streampack.core.elements.interfaces.WithMetrics +import io.github.thibaultbee.streampack.core.elements.interfaces.WithEndpointMetrics import io.github.thibaultbee.streampack.core.logger.Logger import io.github.thibaultbee.streampack.ext.srt.configuration.mediadescriptor.SrtMediaDescriptor +import io.github.thibaultbee.streampack.ext.srt.utils.SrtMetrics +import io.github.thibaultbee.streampack.ext.srt.utils.SrtRawMetrics import io.github.thibaultbee.streampack.ext.srt.utils.SrtStatsHelper import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asStateFlow class SrtSink(private val coroutineDispatcher: CoroutineDispatcher) : AbstractSink(), - WithMetrics { + WithEndpointMetrics { override val supportedSinkTypes: List = listOf(MediaSinkType.SRT) private var socket: CoroutineSrtSocket? = null @@ -48,11 +49,13 @@ class SrtSink(private val coroutineDispatcher: CoroutineDispatcher) : AbstractSi private var bitrate = 0L + private val srtRawMetrics = SrtRawMetrics { socket } + /** * Get SRT stats */ - override val metrics: Stats - get() = socket?.bistats(clear = true, instantaneous = true) ?: SrtStatsHelper.ZERO + override val metrics: SrtMetrics + get() = SrtMetrics(srtRawMetrics) private val _isOpenFlow = MutableStateFlow(false) override val isOpenFlow = _isOpenFlow.asStateFlow() diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/DummySrtBitrateRegulator.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/DummySrtBitrateRegulator.kt index 65101a21b..a87da60cb 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/DummySrtBitrateRegulator.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/DummySrtBitrateRegulator.kt @@ -72,12 +72,7 @@ class DummySrtBitrateRegulator( } if (newVideoBitrate != 0) { - onVideoTargetBitrateChange( - max( - newVideoBitrate, - bitrateRegulatorConfig.videoBitrateRange.lower - ) - ) // Don't go under videoBitrateRange.lower + onVideoTargetBitrateChange(newVideoBitrate) return } // Can bitrate go upper? @@ -94,12 +89,7 @@ class DummySrtBitrateRegulator( } if (newVideoBitrate != 0) { - onVideoTargetBitrateChange( - max( - newVideoBitrate, - bitrateRegulatorConfig.videoBitrateRange.lower - ) - ) // Don't go under videoBitrateRange.lower + onVideoTargetBitrateChange(newVideoBitrate) return } } diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/utils/SrtMetrics.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/utils/SrtMetrics.kt new file mode 100644 index 000000000..9a0c19429 --- /dev/null +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/utils/SrtMetrics.kt @@ -0,0 +1,196 @@ +/* + * Copyright (C) 2026 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.ext.srt.utils + +import io.github.thibaultbee.srtdroid.core.models.Stats +import io.github.thibaultbee.srtdroid.ktx.CoroutineSrtSocket +import io.github.thibaultbee.streampack.core.elements.interfaces.EndpointMetrics +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds + +fun SrtMetrics(srtRawMetrics: SrtRawMetrics): SrtMetrics { + val stats = srtRawMetrics.bstats(clear = false) + return SrtMetrics( + uptime = stats.msTimeStamp.milliseconds, + packetsSent = stats.pktSentTotal, + packetsReceived = stats.pktRecvTotal, + packetsSendLost = stats.pktSndLossTotal.toLong(), + packetsReceiveLost = stats.pktRcvLossTotal, + packetsRetransmitted = stats.pktRetransTotal, + packetsSendACK = stats.pktSentACKTotal, + packetsReceiveACK = stats.pktRecvACKTotal, + packetsSendNAK = stats.pktSentNAKTotal, + packetsReceiveNAK = stats.pktRecvNAKTotal, + usSndDuration = stats.usSndDurationTotal, + packetsSendDropped = stats.pktSndDropTotal.toLong(), + packetsReceiveDropped = stats.pktRcvDropTotal, + packetsReceiveUndecrypt = stats.pktRcvUndecryptTotal, + bytesSent = stats.byteSentTotal, + bytesReceived = stats.byteRecvTotal, + bytesReceiveLost = stats.byteRcvLossTotal, + bytesRetransmitted = stats.byteRetransTotal, + bytesSendDropped = stats.byteSndDropTotal, + bytesReceiveDropped = stats.byteRcvDropTotal, + bytesReceiveUndecrypt = stats.byteRcvUndecryptTotal, + rawMetrics = srtRawMetrics + ) +} + +data class SrtMetrics( + /** + * The time since the entity is started + */ + override val uptime: Duration, + /** + * The total number of sent data packets, including retransmissions + */ + override val packetsSent: Long, + /** + * The total number of received packets + */ + val packetsReceived: Long, + /** + * The total number of lost packets (sender side) + */ + override val packetsSendLost: Long, + /** + * The total number of lost packets (receiver side) + */ + val packetsReceiveLost: Int, + /** + * The total number of retransmitted packets + */ + val packetsRetransmitted: Int, + /** + * The total number of sent ACK packets + */ + val packetsSendACK: Int, + /** + * The total number of received ACK packets + */ + val packetsReceiveACK: Int, + /** + * The total number of sent NAK packets + */ + val packetsSendNAK: Int, + /** + * The total number of received NAK packets + */ + val packetsReceiveNAK: Int, + /** + * The total time duration when UDT is sending data (idle time exclusive) + */ + val usSndDuration: Long, + /** + * The number of too-late-to-send dropped packets + */ + override val packetsSendDropped: Long, + /** + * The number of too-late-to play missing packets + */ + val packetsReceiveDropped: Int, + /** + * The number of undecrypted packets + */ + val packetsReceiveUndecrypt: Int, + /** + * The total number of sent data bytes, including retransmissions + */ + override val bytesSent: Long, + /** + * The total number of received bytes + */ + val bytesReceived: Long, + /** + * The total number of lost bytes + */ + val bytesReceiveLost: Long, + /** + * The total number of retransmitted bytes + */ + val bytesRetransmitted: Long, + /** + * The number of too-late-to-send dropped bytes + */ + override val bytesSendDropped: Long, + /** + * The number of too-late-to play missing bytes (estimate based on average packet size) + */ + val bytesReceiveDropped: Long, + /** + * The number of undecrypted bytes + */ + val bytesReceiveUndecrypt: Long, + /** + * Raw SRT socket helper + */ + override val rawMetrics: SrtRawMetrics, +) : EndpointMetrics + +/** + * Provides an access to internal SRT APIs. + */ +class SrtRawMetrics internal constructor(private val socketProvider: () -> CoroutineSrtSocket?) { + /** + * Reports the current statistics. + * + * **See Also:** [srt_bstats](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_bstats) + * + * @param clear true if the statistics should be cleared after retrieval + * @return the current [Stats] or null if the socket is not connected + */ + fun bstatsOrNull(clear: Boolean): Stats? { + return socketProvider()?.bstats(clear) + } + + /** + * Reports the current statistics. + * + * **See Also:** [srt_bistats](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_bistats) + * + * @param clear true if the statistics should be cleared after retrieval + * @param instantaneous true if the statistics should use instant data, not moving averages + * @return the current [Stats] if the socket is not connected + */ + fun bistatsOrNull(clear: Boolean, instantaneous: Boolean): Stats? { + return socketProvider()?.bistats(clear, instantaneous) + } +} + +/** + * Reports the current statistics. + * + * **See Also:** [srt_bstats](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_bstats) + * + * @param clear true if the statistics should be cleared after retrieval + * @return the current [Stats] or [SrtStatsHelper.ZERO] if the socket is not connected + */ +fun SrtRawMetrics.bstats(clear: Boolean): Stats { + return bstatsOrNull(clear) ?: SrtStatsHelper.ZERO +} + +/** + * Reports the current statistics. + * + * **See Also:** [srt_bistats](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_bistats) + * + * @param clear true if the statistics should be cleared after retrieval + * @param instantaneous true if the statistics should use instant data, not moving averages + * @return the current [Stats] or [SrtStatsHelper.ZERO] if the socket is not connected + */ +fun SrtRawMetrics.bistats(clear: Boolean, instantaneous: Boolean): Stats { + return bistatsOrNull(clear, instantaneous) ?: SrtStatsHelper.ZERO +} \ No newline at end of file