Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,6 @@ open class CombineEndpoint(
.reduce { acc, iEndpointInfo -> acc intersect iEndpointInfo }
}

/**
* Throws [UnsupportedOperationException] because [CombineEndpoint] does not have metrics.
*
* Call [IEndpoint.metrics] on each endpoint to get their metrics.
*/
override val metrics: Any
get() = throw UnsupportedOperationException("CombineEndpoint does not have metrics.")

private fun createNewStreamId(): Int {
var i = 0
while (endpointsToStreamIdsMap.keys.any { it.second == i }) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ class DummyEndpoint : IEndpointInternal {
TODO("Not yet implemented")
}

override val metrics: Any
get() = TODO("Not yet implemented")
override val throwableFlow: StateFlow<Throwable?> = MutableStateFlow(null).asStateFlow()

override suspend fun open(descriptor: MediaDescriptor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxer
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.data.TSServiceInfo
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.ContentSink
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.FileSink
import io.github.thibaultbee.streampack.core.elements.interfaces.EndpointMetrics
import io.github.thibaultbee.streampack.core.elements.interfaces.WithEndpointMetrics
import io.github.thibaultbee.streampack.core.elements.utils.ConflatedJob
import io.github.thibaultbee.streampack.core.logger.Logger
import io.github.thibaultbee.streampack.core.pipelines.IDispatcherProvider
Expand All @@ -48,7 +50,7 @@ open class DynamicEndpoint(
private val context: Context,
private val defaultDispatcher: CoroutineDispatcher,
private val ioDispatcher: CoroutineDispatcher
) : IEndpointInternal {
) : IEndpointInternal, WithEndpointMetrics {
private val coroutineScope = CoroutineScope(defaultDispatcher)
private val mutex = Mutex()

Expand Down Expand Up @@ -84,8 +86,12 @@ open class DynamicEndpoint(

override fun getInfo(type: MediaDescriptor.Type) = getEndpoint(type).getInfo(type)

override val metrics: Any
get() = endpoint?.metrics ?: throw IllegalStateException("Endpoint is not opened")
override val metrics: EndpointMetrics<*>
get() {
val endpoint = endpoint ?: throw IllegalStateException("Endpoint is not opened")
return (endpoint as? WithEndpointMetrics)?.metrics
?: throw UnsupportedOperationException("Current endpoint does not support metrics")
}

init {
coroutineScope.launch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,4 @@ interface IEndpoint {
val supportedEncoders: List<String>
}
}

/**
* Metrics of the endpoint.
*/
val metrics: Any
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ class MediaMuxerEndpoint(

override fun getInfo(type: MediaDescriptor.Type) = Companion.getInfo(type)

override val metrics: Any
get() = TODO("Not yet implemented")

private val _isOpenFlow = MutableStateFlow(false)
override val isOpenFlow = _isOpenFlow.asStateFlow()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ class CompositeEndpoint(
override val info by lazy { EndpointInfo(muxer.info) }
override fun getInfo(type: MediaDescriptor.Type) = info

override val metrics: Any
get() = sink.metrics

init {
muxer.listener = object :
IMuxerInternal.IMuxerListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ import io.github.thibaultbee.streampack.core.logger.Logger
abstract class AbstractSink : ISinkInternal {
abstract val supportedSinkTypes: List<MediaSinkType>

override val metrics: Any
get() = TODO("Not yet implemented")

override suspend fun open(mediaDescriptor: MediaDescriptor) {
if (isOpenFlow.value) {
Logger.w(TAG, "Sink is already opened")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,4 @@ interface ISink {
* For example, if the file is opened for [FileSink].
*/
val isOpenFlow: StateFlow<Boolean>

/**
* Metrics of the sink.
*/
val metrics: Any
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (C) 2026 Thibault B.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.core.elements.interfaces

import kotlin.time.Duration

/**
* Minimal metrics interface
*/
interface WithMetrics {
/**
* Metrics of the element.
*/
val metrics: Any
}

/**
* Endpoint metrics interface
*/
interface EndpointMetrics<out T : Any> {
/**
* The duration of the interval
*/
val uptime: Duration

/**
* The number of packets sent.
*/
val packetsSent: Long

/**
* The number of packets dropped before sending (e.g. due to congestion or timeout).
*/
val packetsSendDropped: Long

/**
* The number of packets lost during the transmission.
*/
val packetsSendLost: Long

/**
* The number of bytes successfully sent.
*/
val bytesSent: Long

/**
* The number of bytes dropped before sending (e.g. due to congestion or timeout).
*/
val bytesSendDropped: Long

/**
* The protocol-specific metrics wrapper.
*/
val rawMetrics: T
}

/**
* The total sent bitrate in bits per second (bps).
*/
val EndpointMetrics<*>.totalSentBitrateInBps: Long
get() = uptime.inWholeMilliseconds.let { if (it == 0L) 0L else (bytesSent * 8000) / it }

/**
* A specific [WithMetrics] for [EndpointMetrics].
*/
interface WithEndpointMetrics : WithMetrics {
/**
* Metrics of the element
*/
override val metrics: EndpointMetrics<*>
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import io.github.thibaultbee.streampack.core.elements.sources.video.camera.exten
import io.github.thibaultbee.streampack.core.elements.sources.video.camera.extensions.sensitivityRange
import io.github.thibaultbee.streampack.core.elements.sources.video.camera.extensions.zoomRatioRange
import io.github.thibaultbee.streampack.core.elements.sources.video.camera.utils.CaptureResultListener
import io.github.thibaultbee.streampack.core.elements.utils.extensions.clamp
import io.github.thibaultbee.streampack.core.elements.utils.extensions.coerceIn
import io.github.thibaultbee.streampack.core.elements.utils.extensions.isApplicationPortrait
import io.github.thibaultbee.streampack.core.elements.utils.extensions.isNormalized
import io.github.thibaultbee.streampack.core.elements.utils.extensions.launchIn
Expand Down Expand Up @@ -533,7 +533,7 @@ class CameraSettings internal constructor(
suspend fun setSensorSensitivity(sensorSensitivity: Int) {
cameraSettings.set(
CaptureRequest.SENSOR_SENSITIVITY,
sensorSensitivity.clamp(availableSensorSensitivityRange)
sensorSensitivity.coerceIn(availableSensorSensitivityRange)
)
cameraSettings.applyRepeatingSession()
}
Expand Down Expand Up @@ -693,7 +693,7 @@ class CameraSettings internal constructor(
suspend fun setCompensation(compensation: Int) {
cameraSettings.set(
CaptureRequest.CONTROL_AE_EXPOSURE_COMPENSATION,
compensation.clamp(availableCompensationRange)
compensation.coerceIn(availableCompensationRange)
)
cameraSettings.applyRepeatingSession()
}
Expand Down Expand Up @@ -777,7 +777,7 @@ class CameraSettings internal constructor(
suspend fun onPinch(scale: Float) {
val scaledRatio: Float = getZoomRatio() * speedUpZoomByX(scale, 2)
// Clamp the ratio with the zoom range.
setZoomRatio(scaledRatio.clamp(availableRatioRange.lower, availableRatioRange.upper))
setZoomRatio(scaledRatio.coerceIn(availableRatioRange.lower, availableRatioRange.upper))
}

private fun speedUpZoomByX(scaleFactor: Float, ratio: Int): Float {
Expand Down Expand Up @@ -825,7 +825,7 @@ class CameraSettings internal constructor(
@RequiresPermission(Manifest.permission.CAMERA)
override suspend fun setZoomRatio(zoomRatio: Float) {
mutex.withLock {
val clampedValue = zoomRatio.clamp(availableRatioRange)
val clampedValue = zoomRatio.coerceIn(availableRatioRange)
if (clampedValue == persistentZoomRatio) {
return@withLock
}
Expand Down Expand Up @@ -905,7 +905,7 @@ class CameraSettings internal constructor(
return
}
cameraSettings.set(
CaptureRequest.CONTROL_ZOOM_RATIO, zoomRatio.clamp(availableRatioRange)
CaptureRequest.CONTROL_ZOOM_RATIO, zoomRatio.coerceIn(availableRatioRange)
)
cameraSettings.applyRepeatingSession()
notifyZoomListeners(zoomRatio)
Expand Down Expand Up @@ -1021,7 +1021,7 @@ class CameraSettings internal constructor(
@RequiresPermission(Manifest.permission.CAMERA)
suspend fun setLensDistance(lensDistance: Float) {
cameraSettings.set(
CaptureRequest.LENS_FOCUS_DISTANCE, lensDistance.clamp(availableLensDistanceRange)
CaptureRequest.LENS_FOCUS_DISTANCE, lensDistance.coerceIn(availableLensDistanceRange)
)
cameraSettings.applyRepeatingSession()
}
Expand Down Expand Up @@ -1618,10 +1618,10 @@ class CameraSettings internal constructor(
(centerY + height / 2).toInt()
)

focusRect.left = focusRect.left.clamp(cropRegion.right, cropRegion.left)
focusRect.right = focusRect.right.clamp(cropRegion.right, cropRegion.left)
focusRect.top = focusRect.top.clamp(cropRegion.bottom, cropRegion.top)
focusRect.bottom = focusRect.bottom.clamp(cropRegion.bottom, cropRegion.top)
focusRect.left = focusRect.left.coerceIn(cropRegion.right, cropRegion.left)
focusRect.right = focusRect.right.coerceIn(cropRegion.right, cropRegion.left)
focusRect.top = focusRect.top.coerceIn(cropRegion.bottom, cropRegion.top)
focusRect.bottom = focusRect.bottom.coerceIn(cropRegion.bottom, cropRegion.top)

return MeteringRectangle(focusRect, DEFAULT_METERING_WEIGHT_MAX)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ import java.io.Closeable
*/
class ChannelWithCloseableData<T>(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((T) -> Unit) = {}
) : ReceiveChannel<ChannelWithCloseableData.CloseableData<T>> {
private val channel =
Channel<CloseableData<T>>(capacity, onBufferOverflow, onUndeliveredElement = { it.close() })
Channel<CloseableData<T>>(capacity, onBufferOverflow, onUndeliveredElement = {
it.close()
onUndeliveredElement(it.data)
})

/**
* Sends data along with a close action to the channel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ internal fun Any.numOfBits(): Int {
}
}

internal fun <T : Comparable<T>> T.clamp(min: T, max: T): T {
return if (max >= min) {
if (this < min) min else if (this > max) max else this
} else {
if (this < max) max else if (this > min) min else this
internal fun <T : Comparable<T>> T.coerceIn(min: T, max: T): T {
return when {
this < min -> min
this > max -> max
else -> this
}
}

internal fun <T : Comparable<T>> T.clamp(range: Range<T>) =
this.clamp(range.lower, range.upper)
internal fun <T : Comparable<T>> T.coerceIn(range: Range<T>) =
this.coerceIn(range.lower, range.upper)

internal val PointF.isNormalized: Boolean
get() = x in 0f..1f && y in 0f..1f
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfi
*/
interface IBitrateRegulator {
/**
* Calls regularly to get new stats
* Calls regularly to get new metrics
*
* @param stats transmission stats
* @param metrics transmission metrics
* @param currentVideoBitrate current video bitrate target in bits/s.
* @param currentAudioBitrate current audio bitrate target in bits/s.
*/
fun update(stats: Any, currentVideoBitrate: Int, currentAudioBitrate: Int)
fun update(metrics: Any, currentVideoBitrate: Int, currentAudioBitrate: Int)

/**
* Factory interface you must use to create a [BitrateRegulator] object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package io.github.thibaultbee.streampack.core.regulator.controllers

import io.github.thibaultbee.streampack.core.configuration.BitrateRegulatorConfig
import io.github.thibaultbee.streampack.core.elements.encoders.IEncoder
import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpoint
import io.github.thibaultbee.streampack.core.elements.interfaces.WithMetrics
import io.github.thibaultbee.streampack.core.pipelines.outputs.encoding.IEncodingPipelineOutput
import io.github.thibaultbee.streampack.core.regulator.IBitrateRegulator
import kotlinx.coroutines.CoroutineDispatcher
Expand All @@ -27,14 +27,14 @@ import kotlinx.coroutines.CoroutineDispatcher
*
* @param audioEncoder the audio [IEncoder]
* @param videoEncoder the video [IEncoder]
* @param endpoint the [IEndpoint] implementation
* @param endpoint the [WithMetrics] implementation
* @param bitrateRegulatorFactory the [IBitrateRegulator.Factory] implementation. Use it to make your own bitrate regulator.
* @param bitrateRegulatorConfig bitrate regulator configuration
*/
abstract class BitrateRegulatorController(
private val audioEncoder: IEncoder?,
private val videoEncoder: IEncoder?,
private val endpoint: IEndpoint,
private val endpoint: WithMetrics,
private val bitrateRegulatorFactory: IBitrateRegulator.Factory,
private val bitrateRegulatorConfig: BitrateRegulatorConfig = BitrateRegulatorConfig()
) : IBitrateRegulatorController {
Expand Down
Loading
Loading