Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package io.github.thibaultbee.streampack.ext.rtmp.configuration.mediadescriptor

import android.net.Uri
import androidx.core.net.toUri
import io.github.komedia.komuxer.rtmp.messages.command.ConnectObjectBuilder
import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor
import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.UriMediaDescriptor
import io.github.thibaultbee.streampack.core.elements.endpoints.MediaContainerType
Expand All @@ -30,19 +31,18 @@ import java.security.InvalidParameterException
* Otherwise, an [InvalidParameterException] will be thrown.
*/
fun RtmpMediaDescriptor(descriptor: MediaDescriptor) =
when (descriptor) {
is RtmpMediaDescriptor -> descriptor
is UriMediaDescriptor -> {
RtmpMediaDescriptor.fromUri(descriptor.uri)
}

else -> throw InvalidParameterException("Invalid descriptor ${descriptor::class.java.simpleName} for RTMP")
}
RtmpMediaDescriptor.fromMediaDescriptor(descriptor)

/**
* Creates a RTMP connection descriptor from an [Uri]
*
* @param uri the server uri
* @param connectInfo configures the RTMP connect command sent during the handshake
*/
fun RtmpMediaDescriptor(uri: Uri) = RtmpMediaDescriptor.fromUri(uri)
fun RtmpMediaDescriptor(
uri: Uri,
connectInfo: (ConnectObjectBuilder.() -> Unit)? = null
) = RtmpMediaDescriptor.fromUri(uri, connectInfo)

/**
* RTMP connection parameters
Expand All @@ -52,10 +52,15 @@ fun RtmpMediaDescriptor(uri: Uri) = RtmpMediaDescriptor.fromUri(uri)
* @param port the server port
* @param app the application name
* @param streamKey the stream key
* @param connectInfo configures the RTMP connect command sent during the handshake
*/
class RtmpMediaDescriptor(
val scheme: String, val host: String, val port: Int, val app: String?, val streamKey: String
) : MediaDescriptor(Type(MediaContainerType.FLV, MediaSinkType.RTMP)) {
val scheme: String, val host: String, val port: Int, val app: String?, val streamKey: String,
val connectInfo: (ConnectObjectBuilder.() -> Unit)? = null
) : MediaDescriptor(
Type(MediaContainerType.FLV, MediaSinkType.RTMP),
emptyList()
) {
init {
require(scheme == RTMP_SCHEME || scheme == RTMPS_SCHEME || scheme == RTMPT_SCHEME || scheme == RTMPE_SCHEME || scheme == RTMFP_SCHEME || scheme == RTMPTE_SCHEME || scheme == RTMPTS_SCHEME) { "Invalid scheme $scheme" }
require(host.isNotBlank()) { "Invalid host $host" }
Expand Down Expand Up @@ -91,18 +96,53 @@ class RtmpMediaDescriptor(
* Creates a RTMP connection descriptor from an URL
*
* @param url the server url (syntax: rtmp://host:port/app/streamKey)
* @param connectInfo configures the RTMP connect command sent during the handshake
* @return RTMP connection descriptor
*/
fun fromUrl(url: String) =
fromUri(url.toUri())
internal fun fromUrl(
url: String,
connectInfo: (ConnectObjectBuilder.() -> Unit)? = null
) = fromUri(url.toUri(), connectInfo)

/**
* Creates a RTMP connection descriptor from an Uri
*
* @param uri the server Uri
* @param connectInfo configures the RTMP connect command sent during the handshake
* @return RTMP connection descriptor
*/
internal fun fromUri(
uri: Uri,
connectInfo: (ConnectObjectBuilder.() -> Unit)? = null
): RtmpMediaDescriptor = parse(uri, connectInfo)

/**
* Creates a RTMP connection descriptor from a generic [MediaDescriptor].
*
* If the descriptor is already a [RtmpMediaDescriptor], it is returned as is.
* If the descriptor is an [UriMediaDescriptor], it will be converted to a [RtmpMediaDescriptor].
* Otherwise, an [InvalidParameterException] will be thrown.
*
* @param descriptor the source descriptor
* @return RTMP connection descriptor
*/
fun fromUri(uri: Uri): RtmpMediaDescriptor {
internal fun fromMediaDescriptor(descriptor: MediaDescriptor): RtmpMediaDescriptor =
when (descriptor) {
is RtmpMediaDescriptor -> descriptor
is UriMediaDescriptor -> {
parse(
descriptor.uri,
null
)
}

else -> throw InvalidParameterException("Invalid descriptor ${descriptor::class.java.simpleName} for RTMP")
}

private fun parse(
uri: Uri,
connectInfo: (ConnectObjectBuilder.() -> Unit)?
): RtmpMediaDescriptor {
val scheme =
uri.scheme ?: throw InvalidParameterException("Invalid scheme ${uri.scheme}")
val host = uri.host ?: throw InvalidParameterException("Invalid host ${uri.host}")
Expand All @@ -121,12 +161,12 @@ class RtmpMediaDescriptor(
val streamKey = uri.lastPathSegment
?: throw InvalidParameterException("Invalid streamKey ${uri.lastPathSegment}")
if (uri.pathSegments.size == 1) {
return RtmpMediaDescriptor(scheme, host, port, null, streamKey)
return RtmpMediaDescriptor(scheme, host, port, null, streamKey, connectInfo)
} else {
val app = uri.pathSegments.minus(uri.lastPathSegment).joinToString("/")
return RtmpMediaDescriptor(scheme, host, port, app, streamKey)
return RtmpMediaDescriptor(scheme, host, port, app, streamKey, connectInfo)
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import android.content.Context
import io.github.komedia.komuxer.flv.tags.FLVTag
import io.github.komedia.komuxer.rtmp.RtmpConnectionBuilder
import io.github.komedia.komuxer.rtmp.client.RtmpClient
import io.github.komedia.komuxer.rtmp.client.RtmpClientSettings
import io.github.komedia.komuxer.rtmp.connect
import io.github.komedia.komuxer.rtmp.messages.command.StreamPublishType
import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor
Expand All @@ -35,6 +36,7 @@ import io.github.thibaultbee.streampack.core.pipelines.IDispatcherProvider
import io.github.thibaultbee.streampack.ext.flv.elements.endpoints.composites.muxer.FlvMuxerInfo
import io.github.thibaultbee.streampack.ext.flv.elements.endpoints.composites.muxer.utils.FlvTagBuilder
import io.github.thibaultbee.streampack.ext.flv.elements.endpoints.composites.muxer.utils.close
import io.github.thibaultbee.streampack.ext.rtmp.configuration.mediadescriptor.RtmpMediaDescriptor
import io.ktor.network.selector.SelectorManager
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -101,15 +103,26 @@ class RtmpEndpoint internal constructor(
return mutex.withLock { block(rtmpClient) }
}

override suspend fun open(descriptor: MediaDescriptor) {
override suspend fun open(descriptor: MediaDescriptor) =
open(RtmpMediaDescriptor(descriptor))

private suspend fun open(descriptor: RtmpMediaDescriptor) {
withContext(ioDispatcher) {
mutex.withLock {
if (rtmpClient?.isClosed == false) {
Logger.w(TAG, "Already opened")
return@withContext
}

rtmpClient = connectionBuilder.connect(descriptor.uri.toString()).apply {
val client = if (descriptor.connectInfo != null) {
connectionBuilder.connect(
descriptor.uri.toString(),
RtmpClientSettings(connectInfo = descriptor.connectInfo)
)
} else {
connectionBuilder.connect(descriptor.uri.toString())
}
rtmpClient = client.apply {
_isOpenFlow.emit(true)

socketContext.invokeOnCompletion { throwable ->
Expand Down