diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/CombineEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/CombineEndpoint.kt index f9d235323..190e95b7e 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/CombineEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/CombineEndpoint.kt @@ -115,14 +115,6 @@ open class CombineEndpoint( .reduce { acc, iEndpointInfo -> acc intersect iEndpointInfo } } - /** - * Throws [UnsupportedOperationException] because [CombineEndpoint] does not have metrics. - * - * Call [IEndpoint.metrics] on each endpoint to get their metrics. - */ - override val metrics: Any - get() = throw UnsupportedOperationException("CombineEndpoint does not have metrics.") - private fun createNewStreamId(): Int { var i = 0 while (endpointsToStreamIdsMap.keys.any { it.second == i }) { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DummyEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DummyEndpoint.kt index 429b0a078..9d5b06d0a 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DummyEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DummyEndpoint.kt @@ -41,8 +41,6 @@ class DummyEndpoint : IEndpointInternal { TODO("Not yet implemented") } - override val metrics: Any - get() = TODO("Not yet implemented") override val throwableFlow: StateFlow = MutableStateFlow(null).asStateFlow() override suspend fun open(descriptor: MediaDescriptor) { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt index 1951fabc0..483c479e0 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt @@ -25,6 +25,8 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxer import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.data.TSServiceInfo import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.ContentSink import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.FileSink +import io.github.thibaultbee.streampack.core.elements.metrics.EndpointMetrics +import io.github.thibaultbee.streampack.core.elements.metrics.WithEndpointMetrics import io.github.thibaultbee.streampack.core.elements.utils.ConflatedJob import io.github.thibaultbee.streampack.core.logger.Logger import io.github.thibaultbee.streampack.core.pipelines.IDispatcherProvider @@ -48,7 +50,7 @@ open class DynamicEndpoint( private val context: Context, private val defaultDispatcher: CoroutineDispatcher, private val ioDispatcher: CoroutineDispatcher -) : IEndpointInternal { +) : IEndpointInternal, WithEndpointMetrics { private val coroutineScope = CoroutineScope(defaultDispatcher) private val mutex = Mutex() @@ -84,8 +86,12 @@ open class DynamicEndpoint( override fun getInfo(type: MediaDescriptor.Type) = getEndpoint(type).getInfo(type) - override val metrics: Any - get() = endpoint?.metrics ?: throw IllegalStateException("Endpoint is not opened") + override val metrics: EndpointMetrics<*> + get() { + val endpoint = endpoint ?: throw IllegalStateException("Endpoint is not opened") + return (endpoint as? WithEndpointMetrics)?.metrics + ?: throw UnsupportedOperationException("Current endpoint does not support metrics") + } init { coroutineScope.launch { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/IEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/IEndpoint.kt index 715c7d5c8..8f4fa80be 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/IEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/IEndpoint.kt @@ -170,9 +170,4 @@ interface IEndpoint { val supportedEncoders: List } } - - /** - * Metrics of the endpoint. - */ - val metrics: Any } \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/MediaMuxerEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/MediaMuxerEndpoint.kt index cedb2462a..0be8a776e 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/MediaMuxerEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/MediaMuxerEndpoint.kt @@ -64,9 +64,6 @@ class MediaMuxerEndpoint( override fun getInfo(type: MediaDescriptor.Type) = Companion.getInfo(type) - override val metrics: Any - get() = TODO("Not yet implemented") - private val _isOpenFlow = MutableStateFlow(false) override val isOpenFlow = _isOpenFlow.asStateFlow() diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/CompositeEndpoint.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/CompositeEndpoint.kt index e85726a65..50e0850cf 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/CompositeEndpoint.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/CompositeEndpoint.kt @@ -51,9 +51,6 @@ class CompositeEndpoint( override val info by lazy { EndpointInfo(muxer.info) } override fun getInfo(type: MediaDescriptor.Type) = info - override val metrics: Any - get() = sink.metrics - init { muxer.listener = object : IMuxerInternal.IMuxerListener { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/AbstractSink.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/AbstractSink.kt index 91ac63a37..de4c45efa 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/AbstractSink.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/AbstractSink.kt @@ -13,9 +13,6 @@ import io.github.thibaultbee.streampack.core.logger.Logger abstract class AbstractSink : ISinkInternal { abstract val supportedSinkTypes: List - override val metrics: Any - get() = TODO("Not yet implemented") - override suspend fun open(mediaDescriptor: MediaDescriptor) { if (isOpenFlow.value) { Logger.w(TAG, "Sink is already opened") diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/ISink.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/ISink.kt index 1e36c9cdd..76463db4a 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/ISink.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/sinks/ISink.kt @@ -45,9 +45,4 @@ interface ISink { * For example, if the file is opened for [FileSink]. */ val isOpenFlow: StateFlow - - /** - * Metrics of the sink. - */ - val metrics: Any } \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/metrics/EndpointMetrics.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/metrics/EndpointMetrics.kt new file mode 100644 index 000000000..764581b34 --- /dev/null +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/metrics/EndpointMetrics.kt @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2026 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.core.elements.metrics + +import kotlin.time.Duration + + +/** + * Endpoint metrics interface + */ +interface EndpointMetrics { + /** + * The duration of the interval + */ + val uptime: Duration + + /** + * The number of packets sent. + */ + val packetsSent: Long + + /** + * The number of packets dropped before sending (e.g. due to congestion or timeout). + */ + val packetsSendDropped: Long + + /** + * The number of packets lost during the transmission. + */ + val packetsSendLost: Long + + /** + * The number of bytes successfully sent. + */ + val bytesSent: Long + + /** + * The number of bytes dropped before sending (e.g. due to congestion or timeout). + */ + val bytesSendDropped: Long + + /** + * The protocol-specific metrics wrapper. + */ + val rawMetrics: T +} + +/** + * The total sent bitrate in bits per second (bps). + */ +val EndpointMetrics<*>.sentBitrateInBps: Long + get() = uptime.inWholeMilliseconds.let { if (it == 0L) 0L else (bytesSent * 8000) / it } + +/** + * A specific [WithMetrics] for [EndpointMetrics]. + */ +interface WithEndpointMetrics : WithMetrics> \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/metrics/WithMetrics.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/metrics/WithMetrics.kt new file mode 100644 index 000000000..cdbec3f8c --- /dev/null +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/metrics/WithMetrics.kt @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2026 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.core.elements.metrics + +/** + * Minimal metrics interface + */ +interface WithMetrics { + /** + * Metrics of the element. + */ + val metrics: T +} diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/sources/video/camera/CameraSettings.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/sources/video/camera/CameraSettings.kt index a1392623a..c6c85a4c1 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/sources/video/camera/CameraSettings.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/sources/video/camera/CameraSettings.kt @@ -56,7 +56,7 @@ import io.github.thibaultbee.streampack.core.elements.sources.video.camera.exten import io.github.thibaultbee.streampack.core.elements.sources.video.camera.extensions.sensitivityRange import io.github.thibaultbee.streampack.core.elements.sources.video.camera.extensions.zoomRatioRange import io.github.thibaultbee.streampack.core.elements.sources.video.camera.utils.CaptureResultListener -import io.github.thibaultbee.streampack.core.elements.utils.extensions.clamp +import io.github.thibaultbee.streampack.core.elements.utils.extensions.coerceIn import io.github.thibaultbee.streampack.core.elements.utils.extensions.isApplicationPortrait import io.github.thibaultbee.streampack.core.elements.utils.extensions.isNormalized import io.github.thibaultbee.streampack.core.elements.utils.extensions.launchIn @@ -533,7 +533,7 @@ class CameraSettings internal constructor( suspend fun setSensorSensitivity(sensorSensitivity: Int) { cameraSettings.set( CaptureRequest.SENSOR_SENSITIVITY, - sensorSensitivity.clamp(availableSensorSensitivityRange) + sensorSensitivity.coerceIn(availableSensorSensitivityRange) ) cameraSettings.applyRepeatingSession() } @@ -693,7 +693,7 @@ class CameraSettings internal constructor( suspend fun setCompensation(compensation: Int) { cameraSettings.set( CaptureRequest.CONTROL_AE_EXPOSURE_COMPENSATION, - compensation.clamp(availableCompensationRange) + compensation.coerceIn(availableCompensationRange) ) cameraSettings.applyRepeatingSession() } @@ -777,7 +777,7 @@ class CameraSettings internal constructor( suspend fun onPinch(scale: Float) { val scaledRatio: Float = getZoomRatio() * speedUpZoomByX(scale, 2) // Clamp the ratio with the zoom range. - setZoomRatio(scaledRatio.clamp(availableRatioRange.lower, availableRatioRange.upper)) + setZoomRatio(scaledRatio.coerceIn(availableRatioRange.lower, availableRatioRange.upper)) } private fun speedUpZoomByX(scaleFactor: Float, ratio: Int): Float { @@ -825,7 +825,7 @@ class CameraSettings internal constructor( @RequiresPermission(Manifest.permission.CAMERA) override suspend fun setZoomRatio(zoomRatio: Float) { mutex.withLock { - val clampedValue = zoomRatio.clamp(availableRatioRange) + val clampedValue = zoomRatio.coerceIn(availableRatioRange) if (clampedValue == persistentZoomRatio) { return@withLock } @@ -905,7 +905,7 @@ class CameraSettings internal constructor( return } cameraSettings.set( - CaptureRequest.CONTROL_ZOOM_RATIO, zoomRatio.clamp(availableRatioRange) + CaptureRequest.CONTROL_ZOOM_RATIO, zoomRatio.coerceIn(availableRatioRange) ) cameraSettings.applyRepeatingSession() notifyZoomListeners(zoomRatio) @@ -1021,7 +1021,7 @@ class CameraSettings internal constructor( @RequiresPermission(Manifest.permission.CAMERA) suspend fun setLensDistance(lensDistance: Float) { cameraSettings.set( - CaptureRequest.LENS_FOCUS_DISTANCE, lensDistance.clamp(availableLensDistanceRange) + CaptureRequest.LENS_FOCUS_DISTANCE, lensDistance.coerceIn(availableLensDistanceRange) ) cameraSettings.applyRepeatingSession() } @@ -1618,10 +1618,10 @@ class CameraSettings internal constructor( (centerY + height / 2).toInt() ) - focusRect.left = focusRect.left.clamp(cropRegion.right, cropRegion.left) - focusRect.right = focusRect.right.clamp(cropRegion.right, cropRegion.left) - focusRect.top = focusRect.top.clamp(cropRegion.bottom, cropRegion.top) - focusRect.bottom = focusRect.bottom.clamp(cropRegion.bottom, cropRegion.top) + focusRect.left = focusRect.left.coerceIn(cropRegion.right, cropRegion.left) + focusRect.right = focusRect.right.coerceIn(cropRegion.right, cropRegion.left) + focusRect.top = focusRect.top.coerceIn(cropRegion.bottom, cropRegion.top) + focusRect.bottom = focusRect.bottom.coerceIn(cropRegion.bottom, cropRegion.top) return MeteringRectangle(focusRect, DEFAULT_METERING_WEIGHT_MAX) } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/ChannelWithCloseableData.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/ChannelWithCloseableData.kt index 130cf4450..35a803271 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/ChannelWithCloseableData.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/ChannelWithCloseableData.kt @@ -34,10 +34,14 @@ import java.io.Closeable */ class ChannelWithCloseableData( capacity: Int = RENDEZVOUS, - onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND + onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, + onUndeliveredElement: ((T) -> Unit) = {} ) : ReceiveChannel> { private val channel = - Channel>(capacity, onBufferOverflow, onUndeliveredElement = { it.close() }) + Channel>(capacity, onBufferOverflow, onUndeliveredElement = { + it.close() + onUndeliveredElement(it.data) + }) /** * Sends data along with a close action to the channel. diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/Extensions.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/Extensions.kt index 5d5fba851..9f59bc79a 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/Extensions.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/Extensions.kt @@ -43,16 +43,16 @@ internal fun Any.numOfBits(): Int { } } -internal fun > T.clamp(min: T, max: T): T { - return if (max >= min) { - if (this < min) min else if (this > max) max else this - } else { - if (this < max) max else if (this > min) min else this +internal fun > T.coerceIn(min: T, max: T): T { + return when { + this < min -> min + this > max -> max + else -> this } } -internal fun > T.clamp(range: Range) = - this.clamp(range.lower, range.upper) +internal fun > T.coerceIn(range: Range) = + this.coerceIn(range.lower, range.upper) internal val PointF.isNormalized: Boolean get() = x in 0f..1f && y in 0f..1f diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/BitrateRegulator.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/BitrateRegulator.kt index ba7c86fb4..6017c7786 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/BitrateRegulator.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/BitrateRegulator.kt @@ -16,6 +16,7 @@ package io.github.thibaultbee.streampack.core.regulator import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig +import io.github.thibaultbee.streampack.core.elements.utils.extensions.coerceIn /** * Abstract class for the bitrate regulation implementation. @@ -29,6 +30,15 @@ import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfi */ abstract class BitrateRegulator( protected val bitrateRegulatorConfig: BitrateRegulatorConfig, - protected val onVideoTargetBitrateChange: ((Int) -> Unit), - protected val onAudioTargetBitrateChange: ((Int) -> Unit) -) : IBitrateRegulator + onVideoTargetBitrateChange: ((Int) -> Unit), + onAudioTargetBitrateChange: ((Int) -> Unit) +) : IBitrateRegulator { + + protected val onVideoTargetBitrateChange: ((Int) -> Unit) = { + onVideoTargetBitrateChange(it.coerceIn(bitrateRegulatorConfig.videoBitrateRange)) + } + + protected val onAudioTargetBitrateChange: ((Int) -> Unit) = { + onAudioTargetBitrateChange(it.coerceIn(bitrateRegulatorConfig.audioBitrateRange)) + } +} diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/IBitrateRegulator.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/IBitrateRegulator.kt index 367af6973..3b4e4a894 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/IBitrateRegulator.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/IBitrateRegulator.kt @@ -16,19 +16,20 @@ package io.github.thibaultbee.streampack.core.regulator import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig +import io.github.thibaultbee.streampack.core.elements.metrics.EndpointMetrics /** * Interface to implement a bitrate regulator. */ interface IBitrateRegulator { /** - * Calls regularly to get new stats + * Called regularly to get new metrics * - * @param stats transmission stats + * @param metrics endpoint metrics * @param currentVideoBitrate current video bitrate target in bits/s. * @param currentAudioBitrate current audio bitrate target in bits/s. */ - fun update(stats: Any, currentVideoBitrate: Int, currentAudioBitrate: Int) + fun update(metrics: EndpointMetrics<*>, currentVideoBitrate: Int, currentAudioBitrate: Int) /** * Factory interface you must use to create a [BitrateRegulator] object. diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/SimpleBitrateRegulator.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/SimpleBitrateRegulator.kt new file mode 100644 index 000000000..0b7c1e4dc --- /dev/null +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/SimpleBitrateRegulator.kt @@ -0,0 +1,93 @@ +package io.github.thibaultbee.streampack.core.regulator + +import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig +import io.github.thibaultbee.streampack.core.elements.metrics.EndpointMetrics +import kotlin.math.max +import kotlin.math.min + +/** + * A [BitrateRegulator] that reduce video bitrate when packets are lost. + * + * @param bitrateRegulatorConfig bitrate regulation configuration + * @param onVideoTargetBitrateChange call when you have to change video bitrate + * @param onAudioTargetBitrateChange call when you have to change audio bitrate + */ +class SimpleBitrateRegulator( + bitrateRegulatorConfig: BitrateRegulatorConfig, + onVideoTargetBitrateChange: ((Int) -> Unit), + onAudioTargetBitrateChange: ((Int) -> Unit) +) : BitrateRegulator( + bitrateRegulatorConfig, + onVideoTargetBitrateChange, + onAudioTargetBitrateChange +) { + companion object { + private const val MINIMUM_DECREASE_THRESHOLD = 100000 // b/s + private const val MAXIMUM_INCREASE_THRESHOLD = 200000 // b/s + } + + /** + * Called regularly to get new endpoint metrics + * + * @param metrics endpoint metrics + * @param currentVideoBitrate current video bitrate target in bits/s. + * @param currentAudioBitrate current audio bitrate target in bits/s. + */ + override fun update( + metrics: EndpointMetrics<*>, + currentVideoBitrate: Int, + currentAudioBitrate: Int + ) { + val packetsLostOrDropped = metrics.packetsSendDropped + metrics.packetsSendLost + if (packetsLostOrDropped > 0) { + // Detected packet dropped or loss - we should reduce the bitrate> + // How critical? + val percentageReduction = if (metrics.packetsSent == 0L) { + 50 + } else { + (packetsLostOrDropped * 100 / metrics.packetsSent).toInt().coerceIn(5, 50) + } + + // Reduce current bitrate by percentageReduction % + val newVideoBitrate = currentVideoBitrate - max( + currentVideoBitrate * percentageReduction / 100, + MINIMUM_DECREASE_THRESHOLD // getting down by 100000 b/s minimum + ) + onVideoTargetBitrateChange(newVideoBitrate) + } else if (currentVideoBitrate < (bitrateRegulatorConfig.videoBitrateRange.upper * 90 / 100)) { + // Try to increase to the max target + val newVideoBitrate = currentVideoBitrate + min( + (bitrateRegulatorConfig.videoBitrateRange.upper - currentVideoBitrate) * 50 / 100, // getting slower when reaching target bitrate + MAXIMUM_INCREASE_THRESHOLD // not increasing to fast + ) + onVideoTargetBitrateChange(newVideoBitrate) + } + } + + /** + * Factory interface you must use to create a [SimpleBitrateRegulator] object. + * If you want to create a custom RTMP bitrate regulation implementation, create a factory that + * implements this interface. + */ + class Factory : IBitrateRegulator.Factory { + /** + * Creates a [SimpleBitrateRegulator] object from given parameters + * + * @param bitrateRegulatorConfig bitrate regulation configuration + * @param onVideoTargetBitrateChange call when you have to change video bitrate + * @param onAudioTargetBitrateChange call when you have to change audio bitrate + * @return a [SimpleBitrateRegulator] object + */ + override fun newBitrateRegulator( + bitrateRegulatorConfig: BitrateRegulatorConfig, + onVideoTargetBitrateChange: ((Int) -> Unit), + onAudioTargetBitrateChange: ((Int) -> Unit) + ): SimpleBitrateRegulator { + return SimpleBitrateRegulator( + bitrateRegulatorConfig, + onVideoTargetBitrateChange, + onAudioTargetBitrateChange + ) + } + } +} \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorController.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorController.kt index 2ba195240..af5a1ecb0 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorController.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorController.kt @@ -17,7 +17,7 @@ package io.github.thibaultbee.streampack.core.regulator.controllers import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig import io.github.thibaultbee.streampack.core.elements.encoders.IEncoder -import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpoint +import io.github.thibaultbee.streampack.core.elements.metrics.WithEndpointMetrics import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IEncodingPipelineOutput import io.github.thibaultbee.streampack.core.regulator.IBitrateRegulator import kotlinx.coroutines.CoroutineDispatcher @@ -27,14 +27,14 @@ import kotlinx.coroutines.CoroutineDispatcher * * @param audioEncoder the audio [IEncoder] * @param videoEncoder the video [IEncoder] - * @param endpoint the [IEndpoint] implementation + * @param metricsProvider the [WithEndpointMetrics] implementation * @param bitrateRegulatorFactory the [IBitrateRegulator.Factory] implementation. Use it to make your own bitrate regulator. * @param bitrateRegulatorConfig bitrate regulator configuration */ abstract class BitrateRegulatorController( private val audioEncoder: IEncoder?, private val videoEncoder: IEncoder?, - private val endpoint: IEndpoint, + private val metricsProvider: WithEndpointMetrics, private val bitrateRegulatorFactory: IBitrateRegulator.Factory, private val bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig() ) : IBitrateRegulatorController { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorControllers.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorControllers.kt new file mode 100644 index 000000000..634d4b104 --- /dev/null +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/BitrateRegulatorControllers.kt @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2026 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.core.regulator.controllers + +import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig +import io.github.thibaultbee.streampack.core.regulator.IBitrateRegulator +import io.github.thibaultbee.streampack.core.regulator.SimpleBitrateRegulator +import io.github.thibaultbee.streampack.core.regulator.controllers.IntervalBitrateRegulatorController.Companion.DEFAULT_POLLING_TIME_IN_MS +import io.github.thibaultbee.streampack.core.regulator.controllers.IntervalBitrateRegulatorController.Factory + +/** + * A [IntervalBitrateRegulatorController.Factory] for [IBitrateRegulator]. + * + * @param bitrateRegulatorFactory the [IBitrateRegulator.Factory] implementation. Use it to make your own bitrate regulator. + * @param bitrateRegulatorConfig bitrate regulator configuration + * @param pollingTimeInMs delay between each call to [IBitrateRegulator.update] + */ +fun intervalRtmpBitrateRegulatorControllerFactory( + bitrateRegulatorFactory: IBitrateRegulator.Factory = SimpleBitrateRegulator.Factory(), + bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig(), + pollingTimeInMs: Long = DEFAULT_POLLING_TIME_IN_MS +) = Factory( + bitrateRegulatorFactory, bitrateRegulatorConfig, pollingTimeInMs +) diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/SimpleBitrateRegulatorController.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/IntervalBitrateRegulatorController.kt similarity index 88% rename from core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/SimpleBitrateRegulatorController.kt rename to core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/IntervalBitrateRegulatorController.kt index cb39e2d3d..6e17e9ba3 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/SimpleBitrateRegulatorController.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/regulator/controllers/IntervalBitrateRegulatorController.kt @@ -17,7 +17,7 @@ package io.github.thibaultbee.streampack.core.regulator.controllers import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig import io.github.thibaultbee.streampack.core.elements.encoders.IEncoder -import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpoint +import io.github.thibaultbee.streampack.core.elements.metrics.WithEndpointMetrics import io.github.thibaultbee.streampack.core.elements.utils.CoroutineScheduler import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IConfigurableAudioEncodingPipelineOutput import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IConfigurableVideoEncodingPipelineOutput @@ -30,15 +30,15 @@ import kotlinx.coroutines.CoroutineDispatcher * * @param audioEncoder the audio [IEncoder] * @param videoEncoder the video [IEncoder] - * @param endpoint the [IEndpoint] implementation + * @param metricsProvider the [WithEndpointMetrics] implementation * @param bitrateRegulatorFactory the [IBitrateRegulator.Factory] implementation. Use it to make your own bitrate regulator. * @param bitrateRegulatorConfig bitrate regulator configuration * @param pollingTimeInMs delay between each call to [IBitrateRegulator.update] */ -open class SimpleBitrateRegulatorController( +class IntervalBitrateRegulatorController( audioEncoder: IEncoder?, videoEncoder: IEncoder, - endpoint: IEndpoint, + metricsProvider: WithEndpointMetrics, bitrateRegulatorFactory: IBitrateRegulator.Factory, coroutineDispatcher: CoroutineDispatcher, bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig(), @@ -46,7 +46,7 @@ open class SimpleBitrateRegulatorController( ) : BitrateRegulatorController( audioEncoder, videoEncoder, - endpoint, + metricsProvider, bitrateRegulatorFactory, bitrateRegulatorConfig ) { @@ -55,10 +55,10 @@ open class SimpleBitrateRegulatorController( */ private val bitrateRegulator = bitrateRegulatorFactory.newBitrateRegulator( bitrateRegulatorConfig, - { + onVideoTargetBitrateChange = { videoEncoder.bitrate = it }, - { /* Do nothing for audio */ } + onAudioTargetBitrateChange = { /* Do nothing for audio */ } ) /** @@ -66,7 +66,7 @@ open class SimpleBitrateRegulatorController( */ private val scheduler = CoroutineScheduler(pollingTimeInMs, coroutineDispatcher) { bitrateRegulator.update( - endpoint.metrics, + metricsProvider.metrics, videoEncoder.bitrate, audioEncoder?.bitrate ?: 0 ) @@ -106,10 +106,11 @@ open class SimpleBitrateRegulatorController( } else { null } - return SimpleBitrateRegulatorController( + val endpoint = pipelineOutput.endpoint as WithEndpointMetrics + return IntervalBitrateRegulatorController( audioEncoder, videoEncoder, - pipelineOutput.endpoint, + endpoint, bitrateRegulatorFactory, coroutineDispatcher, bitrateRegulatorConfig, diff --git a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/data/storage/DataStoreRepository.kt b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/data/storage/DataStoreRepository.kt index ddab33b3d..a170631d3 100644 --- a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/data/storage/DataStoreRepository.kt +++ b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/data/storage/DataStoreRepository.kt @@ -164,7 +164,7 @@ class DataStoreRepository( EndpointType.RTMP -> { val url = preferences[stringPreferencesKey(context.getString(R.string.rtmp_server_url_key))] - ?: context.getString(R.string.default_rtmp_url) + ?: context.getString(R.string.rtmp_default_url) UriMediaDescriptor(context, url) } } @@ -172,19 +172,71 @@ class DataStoreRepository( val bitrateRegulatorConfigFlow: Flow = dataStore.data.map { preferences -> + val endpointTypeId = + preferences[stringPreferencesKey(context.getString(R.string.endpoint_type_key))]?.toInt() + ?: EndpointType.SRT.id + val isBitrateRegulatorEnable = - preferences[booleanPreferencesKey(context.getString(R.string.srt_server_enable_bitrate_regulation_key))] + preferences[booleanPreferencesKey( + context.getString( + when (endpointTypeId) { + EndpointType.SRT.id -> { + R.string.srt_server_enable_bitrate_regulation_key + } + + EndpointType.RTMP.id -> { + R.string.rtmp_server_enable_bitrate_regulation_key + } + + else -> { + throw IllegalArgumentException("Unknown endpoint type") + } + } + ) + )] ?: true if (!isBitrateRegulatorEnable) { return@map null } val videoMinBitrate = - preferences[intPreferencesKey(context.getString(R.string.srt_server_video_min_bitrate_key))]?.toInt() + preferences[intPreferencesKey( + context.getString( + when (endpointTypeId) { + EndpointType.SRT.id -> { + R.string.srt_server_video_min_bitrate_key + } + + EndpointType.RTMP.id -> { + R.string.rtmp_server_video_min_bitrate_key + } + + else -> { + throw IllegalArgumentException("Unknown endpoint type") + } + } + ) + )] ?.times(1000) ?: 300000 val videoMaxBitrate = - preferences[intPreferencesKey(context.getString(R.string.srt_server_video_target_bitrate_key))]?.toInt() + preferences[intPreferencesKey( + context.getString( + when (endpointTypeId) { + EndpointType.SRT.id -> { + R.string.srt_server_video_target_bitrate_key + } + + EndpointType.RTMP.id -> { + R.string.rtmp_server_video_target_bitrate_key + } + + else -> { + throw IllegalArgumentException("Unknown endpoint type") + } + } + ) + )] ?.times(1000) ?: 10000000 BitrateRegulatorConfig( diff --git a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/PreviewViewModel.kt b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/PreviewViewModel.kt index 0b3dbd592..2253390cd 100644 --- a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/PreviewViewModel.kt +++ b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/main/PreviewViewModel.kt @@ -65,7 +65,8 @@ import io.github.thibaultbee.streampack.core.streamers.single.VideoOnlySingleStr import io.github.thibaultbee.streampack.core.streamers.single.withAudio import io.github.thibaultbee.streampack.core.streamers.single.withVideo import io.github.thibaultbee.streampack.core.utils.extensions.isClosedException -import io.github.thibaultbee.streampack.ext.srt.regulator.controllers.simpleSrtBitrateRegulatorControllerFactory +import io.github.thibaultbee.streampack.core.regulator.controllers.intervalRtmpBitrateRegulatorControllerFactory +import io.github.thibaultbee.streampack.ext.srt.regulator.controllers.intervalSrtBitrateRegulatorControllerFactory import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job @@ -312,16 +313,32 @@ class PreviewViewModel(private val application: Application) : ObservableViewMod val descriptor = storageRepository.endpointDescriptorFlow.first() streamer.startStream(descriptor) - if (descriptor.type.sinkType == MediaSinkType.SRT) { + if ((descriptor.type.sinkType == MediaSinkType.RTMP) || (descriptor.type.sinkType == MediaSinkType.SRT)) { val bitrateRegulatorConfig = storageRepository.bitrateRegulatorConfigFlow.first() if (bitrateRegulatorConfig != null) { Log.i(TAG, "Add bitrate regulator controller") - streamer.addBitrateRegulatorController( - simpleSrtBitrateRegulatorControllerFactory( - bitrateRegulatorConfig = bitrateRegulatorConfig - ) - ) + val controllerFactory = + when (descriptor.type.sinkType) { + MediaSinkType.RTMP -> { + intervalRtmpBitrateRegulatorControllerFactory( + bitrateRegulatorConfig = bitrateRegulatorConfig + ) + } + + MediaSinkType.SRT -> { + intervalSrtBitrateRegulatorControllerFactory( + bitrateRegulatorConfig = bitrateRegulatorConfig + ) + } + + else -> { + null + } + } + controllerFactory?.let { + streamer.addBitrateRegulatorController(it) + } ?: Log.e(TAG, "Controller factory is null") } } } catch (e: CancellationException) { diff --git a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/settings/SettingsFragment.kt b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/settings/SettingsFragment.kt index 2a8079eda..04a3b1b1b 100644 --- a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/settings/SettingsFragment.kt +++ b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/ui/settings/SettingsFragment.kt @@ -131,18 +131,30 @@ class SettingsFragment : PreferenceFragmentCompat() { this.findPreference(getString(R.string.srt_server_port_key))!! } - private val serverEnableBitrateRegulationPreference: SwitchPreference by lazy { + private val srtServerEnableBitrateRegulationPreference: SwitchPreference by lazy { this.findPreference(getString(R.string.srt_server_enable_bitrate_regulation_key))!! } - private val serverTargetVideoBitratePreference: SeekBarPreference by lazy { + private val srtServerTargetVideoBitratePreference: SeekBarPreference by lazy { this.findPreference(getString(R.string.srt_server_video_target_bitrate_key))!! } - private val serverMinVideoBitratePreference: SeekBarPreference by lazy { + private val srtServerMinVideoBitratePreference: SeekBarPreference by lazy { this.findPreference(getString(R.string.srt_server_video_min_bitrate_key))!! } + private val rtmpServerEnableBitrateRegulationPreference: SwitchPreference by lazy { + this.findPreference(getString(R.string.rtmp_server_enable_bitrate_regulation_key))!! + } + + private val rtmpServerTargetVideoBitratePreference: SeekBarPreference by lazy { + this.findPreference(getString(R.string.rtmp_server_video_target_bitrate_key))!! + } + + private val rtmpServerMinVideoBitratePreference: SeekBarPreference by lazy { + this.findPreference(getString(R.string.rtmp_server_video_min_bitrate_key))!! + } + private val fileNamePreference: EditTextPreference by lazy { this.findPreference(getString(R.string.file_name_key))!! } @@ -417,26 +429,50 @@ class SettingsFragment : PreferenceFragmentCompat() { editText.filters = arrayOf(InputFilter.LengthFilter(5)) } - serverTargetVideoBitratePreference.isVisible = - serverEnableBitrateRegulationPreference.isChecked - serverMinVideoBitratePreference.isVisible = - serverEnableBitrateRegulationPreference.isChecked - serverEnableBitrateRegulationPreference.setOnPreferenceChangeListener { _, newValue -> - serverTargetVideoBitratePreference.isVisible = newValue as Boolean - serverMinVideoBitratePreference.isVisible = newValue + srtServerTargetVideoBitratePreference.isVisible = + srtServerEnableBitrateRegulationPreference.isChecked + srtServerMinVideoBitratePreference.isVisible = + srtServerEnableBitrateRegulationPreference.isChecked + srtServerEnableBitrateRegulationPreference.setOnPreferenceChangeListener { _, newValue -> + srtServerTargetVideoBitratePreference.isVisible = newValue as Boolean + srtServerMinVideoBitratePreference.isVisible = newValue + true + } + + srtServerTargetVideoBitratePreference.setOnPreferenceChangeListener { _, newValue -> + if ((newValue as Int) < srtServerMinVideoBitratePreference.value) { + srtServerMinVideoBitratePreference.value = newValue + } + true + } + + srtServerMinVideoBitratePreference.setOnPreferenceChangeListener { _, newValue -> + if ((newValue as Int) > srtServerTargetVideoBitratePreference.value) { + srtServerTargetVideoBitratePreference.value = newValue + } + true + } + + rtmpServerTargetVideoBitratePreference.isVisible = + rtmpServerEnableBitrateRegulationPreference.isChecked + rtmpServerMinVideoBitratePreference.isVisible = + rtmpServerEnableBitrateRegulationPreference.isChecked + rtmpServerEnableBitrateRegulationPreference.setOnPreferenceChangeListener { _, newValue -> + rtmpServerTargetVideoBitratePreference.isVisible = newValue as Boolean + rtmpServerMinVideoBitratePreference.isVisible = newValue true } - serverTargetVideoBitratePreference.setOnPreferenceChangeListener { _, newValue -> - if ((newValue as Int) < serverMinVideoBitratePreference.value) { - serverMinVideoBitratePreference.value = newValue + rtmpServerTargetVideoBitratePreference.setOnPreferenceChangeListener { _, newValue -> + if ((newValue as Int) < rtmpServerMinVideoBitratePreference.value) { + rtmpServerMinVideoBitratePreference.value = newValue } true } - serverMinVideoBitratePreference.setOnPreferenceChangeListener { _, newValue -> - if ((newValue as Int) > serverTargetVideoBitratePreference.value) { - serverTargetVideoBitratePreference.value = newValue + rtmpServerMinVideoBitratePreference.setOnPreferenceChangeListener { _, newValue -> + if ((newValue as Int) > rtmpServerTargetVideoBitratePreference.value) { + rtmpServerTargetVideoBitratePreference.value = newValue } true } @@ -459,7 +495,7 @@ class SettingsFragment : PreferenceFragmentCompat() { // Update file extension if (endpoint.hasFileCapabilities) { // Remove previous extension - FileExtension.entries.forEach { + FileExtension.entries.forEach { _ -> fileNamePreference.text = fileNamePreference.text?.substringBeforeLast(".") } // Add correct extension diff --git a/demos/camera/src/main/res/values/strings.xml b/demos/camera/src/main/res/values/strings.xml index 1a8393963..1fa81cee8 100644 --- a/demos/camera/src/main/res/values/strings.xml +++ b/demos/camera/src/main/res/values/strings.xml @@ -127,7 +127,14 @@ rtmp_server_key RTMP Server rtmp_server_url_key - rtmp://192.168.1.192/s/streamKey + rtmp://192.168.1.192/s/streamKey + rtmp_server_enable_bitrate_regulation_key + Enable bitrate regulation + rtmp_server_video_target_bitrate_key + Video target bitrate (kb/s) + rtmp_server_video_min_bitrate_key + Video minimum bitrate (kb/s) + URL file_endpoint_key diff --git a/demos/camera/src/main/res/xml/root_preferences.xml b/demos/camera/src/main/res/xml/root_preferences.xml index e47de9ad3..58a3a89b7 100644 --- a/demos/camera/src/main/res/xml/root_preferences.xml +++ b/demos/camera/src/main/res/xml/root_preferences.xml @@ -167,11 +167,33 @@ app:title="@string/rtmp_server"> + + + + + = _isOpenFlow.asStateFlow() diff --git a/extensions/rtmp/build.gradle.kts b/extensions/rtmp/build.gradle.kts index beca775cf..1be6ad482 100644 --- a/extensions/rtmp/build.gradle.kts +++ b/extensions/rtmp/build.gradle.kts @@ -14,7 +14,7 @@ dependencies { implementation(project(":streampack-core")) implementation(project(":streampack-flv")) - implementation(libs.komedia.komuxer.rtmp) + api(libs.komedia.komuxer.rtmp) implementation(libs.kotlinx.coroutines.android) implementation(libs.androidx.core.ktx) diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/elements/endpoints/RtmpEndpoint.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/elements/endpoints/RtmpEndpoint.kt index bee91e202..096d1d950 100644 --- a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/elements/endpoints/RtmpEndpoint.kt +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/elements/endpoints/RtmpEndpoint.kt @@ -16,11 +16,15 @@ package io.github.thibaultbee.streampack.ext.rtmp.elements.endpoints import android.content.Context +import io.github.komedia.komuxer.amf.AmfVersion import io.github.komedia.komuxer.flv.tags.FLVTag +import io.github.komedia.komuxer.flv.tags.audio.AudioData +import io.github.komedia.komuxer.flv.tags.video.VideoData import io.github.komedia.komuxer.rtmp.RtmpConnectionBuilder import io.github.komedia.komuxer.rtmp.client.RtmpClient import io.github.komedia.komuxer.rtmp.connect import io.github.komedia.komuxer.rtmp.messages.command.StreamPublishType +import io.github.komedia.komuxer.rtmp.util.metrics.RtmpMetrics import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor import io.github.thibaultbee.streampack.core.elements.data.Frame import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig @@ -28,6 +32,7 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.ClosedException import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpoint import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpointInternal import io.github.thibaultbee.streampack.core.elements.endpoints.composites.CompositeEndpoint.EndpointInfo +import io.github.thibaultbee.streampack.core.elements.metrics.WithEndpointMetrics import io.github.thibaultbee.streampack.core.elements.utils.ChannelWithCloseableData import io.github.thibaultbee.streampack.core.elements.utils.useConsumeEach import io.github.thibaultbee.streampack.core.logger.Logger @@ -35,6 +40,7 @@ import io.github.thibaultbee.streampack.core.pipelines.IDispatcherProvider import io.github.thibaultbee.streampack.ext.flv.elements.endpoints.composites.muxer.FlvMuxerInfo import io.github.thibaultbee.streampack.ext.flv.elements.endpoints.composites.muxer.utils.FlvTagBuilder import io.github.thibaultbee.streampack.ext.flv.elements.endpoints.composites.muxer.utils.close +import io.github.thibaultbee.streampack.ext.rtmp.utils.RtmpEndpointMetrics import io.ktor.network.selector.SelectorManager import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope @@ -51,18 +57,43 @@ import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext import kotlinx.io.EOFException import java.io.IOException +import kotlin.concurrent.atomics.ExperimentalAtomicApi /** * An endpoint that send frame to an RTMP server. */ +@OptIn(ExperimentalAtomicApi::class) class RtmpEndpoint internal constructor( defaultDispatcher: CoroutineDispatcher, val ioDispatcher: CoroutineDispatcher -) : IEndpointInternal { +) : IEndpointInternal, WithEndpointMetrics { private val coroutineScope = CoroutineScope(SupervisorJob() + defaultDispatcher) private val mutex = Mutex() + private val metricsLock = Any() + private var frameDropped = 0L + private var audioFrameDropped = 0L + private var videoFrameDropped = 0L + + private var payloadSendDroppedSize = 0L + private var audioPayloadSendDroppedSize = 0L + private var videoPayloadSendDroppedSize = 0L + private val flvTagChannel = ChannelWithCloseableData( - 10 /* Arbitrary buffer size. TODO: add a parameter to set it */, BufferOverflow.DROP_OLDEST + 10 /* Arbitrary buffer size. TODO: add a parameter to set it */, BufferOverflow.DROP_OLDEST, + onUndeliveredElement = { flvTag -> + synchronized(metricsLock) { + val payloadSize = flvTag.data.getSize(AmfVersion.AMF0) + frameDropped++ + payloadSendDroppedSize += payloadSize + if (flvTag.data is AudioData) { + audioFrameDropped++ + audioPayloadSendDroppedSize += payloadSize + } else if (flvTag.data is VideoData) { + videoFrameDropped++ + videoPayloadSendDroppedSize += payloadSize + } + } + } ) private val flvTagBuilder = FlvTagBuilder(flvTagChannel) @@ -70,12 +101,27 @@ class RtmpEndpoint internal constructor( private val connectionBuilder = RtmpConnectionBuilder(selectorManager) private var rtmpClient: RtmpClient? = null - private var startUpTimestamp = INVALID_TIMESTAMP private val timestampMutex = Mutex() - override val metrics: Any - get() = TODO("Not yet implemented") + private val syncMetrics: RtmpMetrics + get() { + val metrics = + rtmpClient?.metrics ?: return RtmpMetrics.ZERO + return synchronized(metricsLock) { + metrics.copy( + messagesSendDropped = metrics.messagesSendDropped + frameDropped, + audioMessagesSendDropped = metrics.audioMessagesSendDropped + audioFrameDropped, + videoMessagesSendDropped = metrics.videoMessagesSendDropped + videoFrameDropped, + payloadSendDroppedSize = metrics.payloadSendDroppedSize + payloadSendDroppedSize, + audioPayloadSendDroppedSize = metrics.audioPayloadSendDroppedSize + audioPayloadSendDroppedSize, + videoPayloadSendDroppedSize = metrics.videoPayloadSendDroppedSize + videoPayloadSendDroppedSize + ) + } + } + + override val metrics: RtmpEndpointMetrics + get() = RtmpEndpointMetrics(syncMetrics) private val _isOpenFlow = MutableStateFlow(false) override val isOpenFlow = _isOpenFlow.asStateFlow() @@ -95,6 +141,18 @@ class RtmpEndpoint internal constructor( } } + private fun resetMetrics() { + synchronized(metricsLock) { + frameDropped = 0L + audioFrameDropped = 0L + videoFrameDropped = 0L + + payloadSendDroppedSize = 0L + audioPayloadSendDroppedSize = 0L + videoPayloadSendDroppedSize = 0L + } + } + private suspend fun safeClient(block: suspend (RtmpClient) -> T): T { val rtmpClient = requireNotNull(rtmpClient) { "Not opened" } require(!rtmpClient.isClosed) { "Connection closed" } @@ -109,6 +167,7 @@ class RtmpEndpoint internal constructor( return@withContext } + resetMetrics() rtmpClient = connectionBuilder.connect(descriptor.uri.toString()).apply { _isOpenFlow.emit(true) diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt new file mode 100644 index 000000000..e85146ca9 --- /dev/null +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/regulator/RtmpBitrateRegulator.kt @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2026 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.ext.rtmp.regulator + +import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig +import io.github.thibaultbee.streampack.core.elements.metrics.EndpointMetrics +import io.github.thibaultbee.streampack.core.regulator.BitrateRegulator +import io.github.thibaultbee.streampack.core.regulator.IBitrateRegulator +import io.github.thibaultbee.streampack.ext.rtmp.utils.RtmpEndpointMetrics + +/** + * Base class of RTMP bitrate regulation implementation. + * + * If you want to implement your custom bitrate regulator, it must inherit from this class. + * The bitrate regulator object is created by streamers with the [IBitrateRegulator.Factory]. + * + * @param bitrateRegulatorConfig bitrate regulation configuration + * @param onVideoTargetBitrateChange call when you have to change video bitrate + * @param onAudioTargetBitrateChange call when you have to change audio bitrate + */ +abstract class RtmpBitrateRegulator( + bitrateRegulatorConfig: BitrateRegulatorConfig, + onVideoTargetBitrateChange: ((Int) -> Unit), + onAudioTargetBitrateChange: ((Int) -> Unit) +) : BitrateRegulator( + bitrateRegulatorConfig, + onVideoTargetBitrateChange, + onAudioTargetBitrateChange +) { + /** + * Called regularly to get new endpoint metrics + * + * @param metrics endpoint metrics + * @param currentVideoBitrate current video bitrate target in bits/s. + * @param currentAudioBitrate current audio bitrate target in bits/s. + */ + override fun update( + metrics: EndpointMetrics<*>, + currentVideoBitrate: Int, + currentAudioBitrate: Int + ) { + update(metrics as RtmpEndpointMetrics, currentVideoBitrate, currentAudioBitrate) + } + + /** + * Called regularly to get new RTMP metrics + * + * @param metrics RTMP endpoint metrics + * @param currentVideoBitrate current video bitrate target in bits/s. + * @param currentAudioBitrate current audio bitrate target in bits/s. + */ + abstract fun update( + metrics: RtmpEndpointMetrics, + currentVideoBitrate: Int, + currentAudioBitrate: Int + ) + + /** + * Factory interface you must use to create a [RtmpBitrateRegulator] object. + * If you want to create a custom RTMP bitrate regulation implementation, create a factory that + * implements this interface. + */ + interface Factory : IBitrateRegulator.Factory { + /** + * Creates a [RtmpBitrateRegulator] object from given parameters + * + * @param bitrateRegulatorConfig bitrate regulation configuration + * @param onVideoTargetBitrateChange call when you have to change video bitrate + * @param onAudioTargetBitrateChange call when you have to change audio bitrate + * @return a [RtmpBitrateRegulator] object + */ + override fun newBitrateRegulator( + bitrateRegulatorConfig: BitrateRegulatorConfig, + onVideoTargetBitrateChange: ((Int) -> Unit), + onAudioTargetBitrateChange: ((Int) -> Unit) + ): RtmpBitrateRegulator + } +} \ No newline at end of file diff --git a/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/utils/RtmpEndpointMetrics.kt b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/utils/RtmpEndpointMetrics.kt new file mode 100644 index 000000000..662a380b2 --- /dev/null +++ b/extensions/rtmp/src/main/java/io/github/thibaultbee/streampack/ext/rtmp/utils/RtmpEndpointMetrics.kt @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2026 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.ext.rtmp.utils + +import io.github.komedia.komuxer.rtmp.util.metrics.RtmpMetrics +import io.github.thibaultbee.streampack.core.elements.metrics.EndpointMetrics +import kotlin.time.Duration + +/** + * Creates a [RtmpEndpointMetrics] from a [RtmpMetrics]. + */ +fun RtmpEndpointMetrics(rawMetrics: RtmpMetrics): RtmpEndpointMetrics { + return RtmpEndpointMetrics( + uptime = rawMetrics.uptime, + packetsSent = rawMetrics.messagesSent, + packetsSendDropped = rawMetrics.messagesSendDropped, + packetsSendLost = 0L, + bytesSent = rawMetrics.totalBytesSent, + bytesSendDropped = rawMetrics.payloadSendDroppedSize, + rawMetrics = rawMetrics + ) +} + +/** + * Specific [EndpointMetrics] for RTMP protocol, based on [RtmpMetrics]. + */ +data class RtmpEndpointMetrics( + override val uptime: Duration, + override val packetsSent: Long, + override val packetsSendDropped: Long, + override val packetsSendLost: Long, + override val bytesSent: Long, + override val bytesSendDropped: Long, + override val rawMetrics: RtmpMetrics +) : EndpointMetrics diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/SrtEndpointFactory.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/SrtEndpointFactory.kt index 920181103..3c05230d9 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/SrtEndpointFactory.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/SrtEndpointFactory.kt @@ -1,3 +1,18 @@ +/* + * Copyright (C) 2026 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.github.thibaultbee.streampack.ext.srt.elements.endpoints import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.createDefaultTsServiceInfo diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/composites/sinks/SrtSink.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/composites/sinks/SrtSink.kt index f5659ffee..6480014b2 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/composites/sinks/SrtSink.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/elements/endpoints/composites/sinks/SrtSink.kt @@ -20,7 +20,6 @@ import io.github.thibaultbee.srtdroid.core.enums.SockOpt import io.github.thibaultbee.srtdroid.core.enums.Transtype import io.github.thibaultbee.srtdroid.core.models.MsgCtrl import io.github.thibaultbee.srtdroid.core.models.SrtUrl.Mode -import io.github.thibaultbee.srtdroid.core.models.Stats import io.github.thibaultbee.srtdroid.ktx.CoroutineSrtSocket import io.github.thibaultbee.srtdroid.ktx.connect import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor @@ -30,13 +29,17 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.composites.data. import io.github.thibaultbee.streampack.core.elements.endpoints.composites.data.SrtPacket import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.AbstractSink import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.SinkConfiguration +import io.github.thibaultbee.streampack.core.elements.metrics.WithEndpointMetrics import io.github.thibaultbee.streampack.core.logger.Logger import io.github.thibaultbee.streampack.ext.srt.configuration.mediadescriptor.SrtMediaDescriptor +import io.github.thibaultbee.streampack.ext.srt.utils.SrtRawMetrics +import io.github.thibaultbee.streampack.ext.srt.utils.SrtEndpointMetrics import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asStateFlow -class SrtSink(private val coroutineDispatcher: CoroutineDispatcher) : AbstractSink() { +class SrtSink(private val coroutineDispatcher: CoroutineDispatcher) : AbstractSink(), + WithEndpointMetrics { override val supportedSinkTypes: List = listOf(MediaSinkType.SRT) private var socket: CoroutineSrtSocket? = null @@ -45,12 +48,13 @@ class SrtSink(private val coroutineDispatcher: CoroutineDispatcher) : AbstractSi private var bitrate = 0L + private val srtRawMetrics = SrtRawMetrics { socket } + /** * Get SRT stats */ - override val metrics: Stats - get() = socket?.bistats(clear = true, instantaneous = true) - ?: throw IllegalStateException("Socket is not initialized") + override val metrics: SrtEndpointMetrics + get() = SrtEndpointMetrics(srtRawMetrics) private val _isOpenFlow = MutableStateFlow(false) override val isOpenFlow = _isOpenFlow.asStateFlow() diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/DummySrtBitrateRegulator.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/DummySrtBitrateRegulator.kt index 65dfd6916..21e48e587 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/DummySrtBitrateRegulator.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/DummySrtBitrateRegulator.kt @@ -15,8 +15,8 @@ */ package io.github.thibaultbee.streampack.ext.srt.regulator -import io.github.thibaultbee.srtdroid.core.models.Stats import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig +import io.github.thibaultbee.streampack.ext.srt.utils.SrtEndpointMetrics import kotlin.math.max import kotlin.math.min @@ -42,7 +42,8 @@ class DummySrtBitrateRegulator( const val SEND_PACKET_THRESHOLD = 50 } - override fun update(stats: Stats, currentVideoBitrate: Int, currentAudioBitrate: Int) { + override fun update(metrics: SrtEndpointMetrics, currentVideoBitrate: Int, currentAudioBitrate: Int) { + val stats = metrics.rawMetrics.bistatsOrNull(clear = true, instantaneous = true) ?: return val estimatedBandwidth = (stats.mbpsBandwidth * 1000000).toInt() if (currentVideoBitrate > bitrateRegulatorConfig.videoBitrateRange.lower) { @@ -72,18 +73,11 @@ class DummySrtBitrateRegulator( } if (newVideoBitrate != 0) { - onVideoTargetBitrateChange( - max( - newVideoBitrate, - bitrateRegulatorConfig.videoBitrateRange.lower - ) - ) // Don't go under videoBitrateRange.lower + onVideoTargetBitrateChange(newVideoBitrate) return } - } - - // Can bitrate go upper? - if (currentVideoBitrate < bitrateRegulatorConfig.videoBitrateRange.upper) { + // Can bitrate go upper? + } else if (currentVideoBitrate < bitrateRegulatorConfig.videoBitrateRange.upper) { val newVideoBitrate = when { (currentVideoBitrate + currentAudioBitrate) < estimatedBandwidth -> { currentVideoBitrate + min( @@ -96,12 +90,7 @@ class DummySrtBitrateRegulator( } if (newVideoBitrate != 0) { - onVideoTargetBitrateChange( - max( - newVideoBitrate, - bitrateRegulatorConfig.videoBitrateRange.lower - ) - ) // Don't go under videoBitrateRange.lower + onVideoTargetBitrateChange(newVideoBitrate) return } } diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/SrtBitrateRegulator.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/SrtBitrateRegulator.kt index dbfb53a84..d49647765 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/SrtBitrateRegulator.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/SrtBitrateRegulator.kt @@ -15,10 +15,11 @@ */ package io.github.thibaultbee.streampack.ext.srt.regulator -import io.github.thibaultbee.srtdroid.core.models.Stats import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig +import io.github.thibaultbee.streampack.core.elements.metrics.EndpointMetrics import io.github.thibaultbee.streampack.core.regulator.BitrateRegulator import io.github.thibaultbee.streampack.core.regulator.IBitrateRegulator +import io.github.thibaultbee.streampack.ext.srt.utils.SrtEndpointMetrics /** * Base class of SRT bitrate regulation implementation. @@ -39,17 +40,29 @@ abstract class SrtBitrateRegulator( onVideoTargetBitrateChange, onAudioTargetBitrateChange ) { - override fun update(stats: Any, currentVideoBitrate: Int, currentAudioBitrate: Int) = - update(stats as Stats, currentVideoBitrate, currentAudioBitrate) + /** + * Called regularly to get new endpoint metrics + * + * @param metrics endpoint metrics + * @param currentVideoBitrate current video bitrate target in bits/s. + * @param currentAudioBitrate current audio bitrate target in bits/s. + */ + override fun update( + metrics: EndpointMetrics<*>, + currentVideoBitrate: Int, + currentAudioBitrate: Int + ) { + update(metrics as SrtEndpointMetrics, currentVideoBitrate, currentAudioBitrate) + } /** - * Call regularly to get new SRT stats + * Called regularly to get new SRT endpoint metrics * - * @param stats SRT transmission stats + * @param metrics SRT endpoint metrics * @param currentVideoBitrate current video bitrate target in bits/s. * @param currentAudioBitrate current audio bitrate target in bits/s. */ - abstract fun update(stats: Stats, currentVideoBitrate: Int, currentAudioBitrate: Int) + abstract fun update(metrics: SrtEndpointMetrics, currentVideoBitrate: Int, currentAudioBitrate: Int) /** * Factory interface you must use to create a [SrtBitrateRegulator] object. diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/DefaultSrtBitrateRegulatorController.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/SrtBitrateRegulatorControllerFactories.kt similarity index 83% rename from extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/DefaultSrtBitrateRegulatorController.kt rename to extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/SrtBitrateRegulatorControllerFactories.kt index 5d56a545a..c7537284d 100644 --- a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/DefaultSrtBitrateRegulatorController.kt +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/regulator/controllers/SrtBitrateRegulatorControllerFactories.kt @@ -16,26 +16,26 @@ package io.github.thibaultbee.streampack.ext.srt.regulator.controllers import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig -import io.github.thibaultbee.streampack.core.regulator.controllers.SimpleBitrateRegulatorController -import io.github.thibaultbee.streampack.core.regulator.controllers.SimpleBitrateRegulatorController.Companion.DEFAULT_POLLING_TIME_IN_MS +import io.github.thibaultbee.streampack.core.regulator.controllers.IntervalBitrateRegulatorController +import io.github.thibaultbee.streampack.core.regulator.controllers.IntervalBitrateRegulatorController.Companion.DEFAULT_POLLING_TIME_IN_MS import io.github.thibaultbee.streampack.ext.srt.regulator.DummySrtBitrateRegulator import io.github.thibaultbee.streampack.ext.srt.regulator.SrtBitrateRegulator /** - * A [SimpleBitrateRegulatorController.Factory] for [SrtBitrateRegulator]. + * A [IntervalBitrateRegulatorController.Factory] for [SrtBitrateRegulator]. * * @param bitrateRegulatorFactory the [SrtBitrateRegulator.Factory] implementation. Use it to make your own bitrate regulator. * @param bitrateRegulatorConfig bitrate regulator configuration * @param pollingTimeInMs delay between each call to [IBitrateRegulator.update] * - * @see SimpleBitrateRegulatorController.Factory + * @see IntervalBitrateRegulatorController.Factory * @see DummySrtBitrateRegulator.Factory */ -fun simpleSrtBitrateRegulatorControllerFactory( +fun intervalSrtBitrateRegulatorControllerFactory( bitrateRegulatorFactory: SrtBitrateRegulator.Factory = DummySrtBitrateRegulator.Factory(), bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig(), pollingTimeInMs: Long = DEFAULT_POLLING_TIME_IN_MS -) = SimpleBitrateRegulatorController.Factory( +) = IntervalBitrateRegulatorController.Factory( bitrateRegulatorFactory, bitrateRegulatorConfig, pollingTimeInMs diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/utils/SrtEndpointMetrics.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/utils/SrtEndpointMetrics.kt new file mode 100644 index 000000000..59a576b23 --- /dev/null +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/utils/SrtEndpointMetrics.kt @@ -0,0 +1,202 @@ +/* + * Copyright (C) 2026 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.ext.srt.utils + +import io.github.thibaultbee.srtdroid.core.models.Stats +import io.github.thibaultbee.srtdroid.ktx.CoroutineSrtSocket +import io.github.thibaultbee.streampack.core.elements.metrics.EndpointMetrics +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds + +/** + * Creates a [SrtEndpointMetrics] from a [SrtRawMetrics]. + */ +fun SrtEndpointMetrics(rawMetrics: SrtRawMetrics): SrtEndpointMetrics { + val stats = rawMetrics.bstats(clear = false) + return SrtEndpointMetrics( + uptime = stats.msTimeStamp.milliseconds, + packetsSent = stats.pktSentTotal, + packetsReceived = stats.pktRecvTotal, + packetsSendLost = stats.pktSndLossTotal.toLong(), + packetsReceiveLost = stats.pktRcvLossTotal, + packetsRetransmitted = stats.pktRetransTotal, + packetsSendACK = stats.pktSentACKTotal, + packetsReceiveACK = stats.pktRecvACKTotal, + packetsSendNAK = stats.pktSentNAKTotal, + packetsReceiveNAK = stats.pktRecvNAKTotal, + usSndDuration = stats.usSndDurationTotal, + packetsSendDropped = stats.pktSndDropTotal.toLong(), + packetsReceiveDropped = stats.pktRcvDropTotal, + packetsReceiveUndecrypt = stats.pktRcvUndecryptTotal, + bytesSent = stats.byteSentTotal, + bytesReceived = stats.byteRecvTotal, + bytesReceiveLost = stats.byteRcvLossTotal, + bytesRetransmitted = stats.byteRetransTotal, + bytesSendDropped = stats.byteSndDropTotal, + bytesReceiveDropped = stats.byteRcvDropTotal, + bytesReceiveUndecrypt = stats.byteRcvUndecryptTotal, + rawMetrics = rawMetrics + ) +} + +/** + * Specific [EndpointMetrics] for SRT protocol, based on [SrtRawMetrics]. + */ +data class SrtEndpointMetrics( + /** + * The time since the entity is started + */ + override val uptime: Duration, + /** + * The total number of sent data packets, including retransmissions + */ + override val packetsSent: Long, + /** + * The total number of received packets + */ + val packetsReceived: Long, + /** + * The total number of lost packets (sender side) + */ + override val packetsSendLost: Long, + /** + * The total number of lost packets (receiver side) + */ + val packetsReceiveLost: Int, + /** + * The total number of retransmitted packets + */ + val packetsRetransmitted: Int, + /** + * The total number of sent ACK packets + */ + val packetsSendACK: Int, + /** + * The total number of received ACK packets + */ + val packetsReceiveACK: Int, + /** + * The total number of sent NAK packets + */ + val packetsSendNAK: Int, + /** + * The total number of received NAK packets + */ + val packetsReceiveNAK: Int, + /** + * The total time duration when UDT is sending data (idle time exclusive) + */ + val usSndDuration: Long, + /** + * The number of too-late-to-send dropped packets + */ + override val packetsSendDropped: Long, + /** + * The number of too-late-to play missing packets + */ + val packetsReceiveDropped: Int, + /** + * The number of undecrypted packets + */ + val packetsReceiveUndecrypt: Int, + /** + * The total number of sent data bytes, including retransmissions + */ + override val bytesSent: Long, + /** + * The total number of received bytes + */ + val bytesReceived: Long, + /** + * The total number of lost bytes + */ + val bytesReceiveLost: Long, + /** + * The total number of retransmitted bytes + */ + val bytesRetransmitted: Long, + /** + * The number of too-late-to-send dropped bytes + */ + override val bytesSendDropped: Long, + /** + * The number of too-late-to play missing bytes (estimate based on average packet size) + */ + val bytesReceiveDropped: Long, + /** + * The number of undecrypted bytes + */ + val bytesReceiveUndecrypt: Long, + /** + * Raw SRT socket helper + */ + override val rawMetrics: SrtRawMetrics, +) : EndpointMetrics + +/** + * Provides an access to internal SRT APIs. + */ +class SrtRawMetrics internal constructor(private val socketProvider: () -> CoroutineSrtSocket?) { + /** + * Reports the current statistics. + * + * **See Also:** [srt_bstats](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_bstats) + * + * @param clear true if the statistics should be cleared after retrieval + * @return the current [Stats] or null if the socket is not connected + */ + fun bstatsOrNull(clear: Boolean): Stats? { + return socketProvider()?.bstats(clear) + } + + /** + * Reports the current statistics. + * + * **See Also:** [srt_bistats](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_bistats) + * + * @param clear true if the statistics should be cleared after retrieval + * @param instantaneous true if the statistics should use instant data, not moving averages + * @return the current [Stats] if the socket is not connected + */ + fun bistatsOrNull(clear: Boolean, instantaneous: Boolean): Stats? { + return socketProvider()?.bistats(clear, instantaneous) + } +} + +/** + * Reports the current statistics. + * + * **See Also:** [srt_bstats](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_bstats) + * + * @param clear true if the statistics should be cleared after retrieval + * @return the current [Stats] or [SrtStatsHelper.ZERO] if the socket is not connected + */ +fun SrtRawMetrics.bstats(clear: Boolean): Stats { + return bstatsOrNull(clear) ?: SrtStatsHelper.ZERO +} + +/** + * Reports the current statistics. + * + * **See Also:** [srt_bistats](https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_bistats) + * + * @param clear true if the statistics should be cleared after retrieval + * @param instantaneous true if the statistics should use instant data, not moving averages + * @return the current [Stats] or [SrtStatsHelper.ZERO] if the socket is not connected + */ +fun SrtRawMetrics.bistats(clear: Boolean, instantaneous: Boolean): Stats { + return bistatsOrNull(clear, instantaneous) ?: SrtStatsHelper.ZERO +} \ No newline at end of file diff --git a/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/utils/SrtStatsHelper.kt b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/utils/SrtStatsHelper.kt new file mode 100644 index 000000000..e337b3575 --- /dev/null +++ b/extensions/srt/src/main/java/io/github/thibaultbee/streampack/ext/srt/utils/SrtStatsHelper.kt @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2026 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.ext.srt.utils + +import io.github.thibaultbee.srtdroid.core.models.Stats + +object SrtStatsHelper { + val ZERO = Stats( + msTimeStamp = 0L, + pktSentTotal = 0L, + pktRecvTotal = 0L, + pktSndLossTotal = 0, + pktRcvLossTotal = 0, + pktRetransTotal = 0, + pktSentACKTotal = 0, + pktRecvACKTotal = 0, + pktSentNAKTotal = 0, + pktRecvNAKTotal = 0, + usSndDurationTotal = 0, + pktSndDropTotal = 0, + pktRcvDropTotal = 0, + pktRcvUndecryptTotal = 0, + byteSentTotal = 0, + byteRecvTotal = 0, + byteRcvLossTotal = 0, + byteRetransTotal = 0, + byteSndDropTotal = 0, + byteRcvDropTotal = 0, + byteRcvUndecryptTotal = 0, + pktSent = 0, + pktRecv = 0, + pktSndLoss = 0, + pktRcvLoss = 0, + pktRetrans = 0, + pktRcvRetrans = 0, + pktSentACK = 0, + pktRecvACK = 0, + pktSentNAK = 0, + pktRecvNAK = 0, + mbpsSendRate = 0.0, + mbpsRecvRate = 0.0, + usSndDuration = 0, + pktReorderDistance = 0, + pktRcvAvgBelatedTime = 0.0, + pktRcvBelated = 0, + pktSndDrop = 0, + pktRcvDrop = 0, + pktRcvUndecrypt = 0, + byteSent = 0, + byteRecv = 0, + byteRcvLoss = 0, + byteRetrans = 0, + byteSndDrop = 0, + byteRcvDrop = 0, + byteRcvUndecrypt = 0, + usPktSndPeriod = 0.0, + pktFlowWindow = 0, + pktCongestionWindow = 0, + pktFlightSize = 0, + msRTT = 0.0, + mbpsBandwidth = 0.0, + byteAvailSndBuf = 0, + byteAvailRcvBuf = 0, + mbpsMaxBW = 0.0, + byteMSS = 0, + pktSndBuf = 0, + byteSndBuf = 0, + msSndBuf = 0, + msSndTsbPdDelay = 0, + pktRcvBuf = 0, + byteRcvBuf = 0, + msRcvBuf = 0, + msRcvTsbPdDelay = 0, + pktSndFilterExtraTotal = 0, + pktRcvFilterExtraTotal = 0, + pktRcvFilterSupplyTotal = 0, + pktRcvFilterLossTotal = 0, + pktSndFilterExtra = 0, + pktRcvFilterExtra = 0, + pktRcvFilterSupply = 0, + pktRcvFilterLoss = 0, + pktReorderTolerance = 0, + pktSentUniqueTotal = 0, + pktRecvUniqueTotal = 0, + byteSentUniqueTotal = 0, + byteRecvUniqueTotal = 0, + pktSentUnique = 0, + pktRecvUnique = 0, + byteSentUnique = 0, + byteRecvUnique = 0 + ) +} + + diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 2cb2b2401..feda42b74 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -27,7 +27,7 @@ kotlinxIo = "0.8.0" material = "1.13.0" mockk = "1.14.5" robolectric = "4.16" -komuxer = "0.3.4" +komuxer = "0.4.0" srtdroid = "1.9.5" junitKtx = "1.3.0" compose = "1.10.1" diff --git a/settings.gradle.kts b/settings.gradle.kts index de694da52..16ddf629a 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -15,6 +15,7 @@ dependencyResolutionManagement { repositoriesMode.set(RepositoriesMode.FAIL_ON_PROJECT_REPOS) repositories { google() + mavenLocal() mavenCentral() } }