From ae95ca2132814af8db7f81e5dd79ccfadb7f8c05 Mon Sep 17 00:00:00 2001 From: rahullohra Date: Tue, 5 May 2026 02:10:08 +0530 Subject: [PATCH 1/6] feat: add RingingCallJoinInterceptor --- .../api/stream-video-android-core.api | 13 +- .../video/android/core/ActiveStateGate.kt | 96 ++++++----- .../io/getstream/video/android/core/Call.kt | 3 + .../getstream/video/android/core/CallState.kt | 8 +- .../core/RingingCallJoinInterceptor.kt | 44 +++++ .../video/android/core/ActiveStateGateTest.kt | 154 +++++++++++++++++- 6 files changed, 273 insertions(+), 45 deletions(-) create mode 100644 stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/RingingCallJoinInterceptor.kt diff --git a/stream-video-android-core/api/stream-video-android-core.api b/stream-video-android-core/api/stream-video-android-core.api index 1680fa52a1..53ee06b506 100644 --- a/stream-video-android-core/api/stream-video-android-core.api +++ b/stream-video-android-core/api/stream-video-android-core.api @@ -8593,8 +8593,8 @@ public final class io/getstream/video/android/core/Call { public final fun isPinnedParticipant (Ljava/lang/String;)Z public final fun isServerPin (Ljava/lang/String;)Z public final fun isVideoEnabled ()Z - public final fun join (ZLio/getstream/video/android/core/CreateCallOptions;ZZLjava/lang/Boolean;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static synthetic fun join$default (Lio/getstream/video/android/core/Call;ZLio/getstream/video/android/core/CreateCallOptions;ZZLjava/lang/Boolean;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public final fun join (ZLio/getstream/video/android/core/CreateCallOptions;ZZLjava/lang/Boolean;Lio/getstream/video/android/core/RingingCallJoinInterceptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun join$default (Lio/getstream/video/android/core/Call;ZLio/getstream/video/android/core/CreateCallOptions;ZZLjava/lang/Boolean;Lio/getstream/video/android/core/RingingCallJoinInterceptor;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public final fun joinAndRing (Ljava/util/List;Lio/getstream/video/android/core/CreateCallOptions;ZLkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun joinAndRing$default (Lio/getstream/video/android/core/Call;Ljava/util/List;Lio/getstream/video/android/core/CreateCallOptions;ZLkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public final fun kickUser (Ljava/lang/String;ZLkotlin/coroutines/Continuation;)Ljava/lang/Object; @@ -8951,6 +8951,11 @@ public final class io/getstream/video/android/core/CreateCallOptions { public fun toString ()Ljava/lang/String; } +public final class io/getstream/video/android/core/DefaultRingingCallJoinInterceptor : io/getstream/video/android/core/RingingCallJoinInterceptor { + public static final field INSTANCE Lio/getstream/video/android/core/DefaultRingingCallJoinInterceptor; + public fun callReadyToJoinWithTimeout (Lio/getstream/video/android/core/Call;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + public abstract class io/getstream/video/android/core/DeviceStatus { } @@ -9317,6 +9322,10 @@ public final class io/getstream/video/android/core/RealtimeConnection$Reconnecti public fun toString ()Ljava/lang/String; } +public abstract interface class io/getstream/video/android/core/RingingCallJoinInterceptor { + public abstract fun callReadyToJoinWithTimeout (Lio/getstream/video/android/core/Call;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + public abstract interface class io/getstream/video/android/core/RingingState { } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt index 8f56483f88..f10e2f17bd 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt @@ -17,8 +17,10 @@ package io.getstream.video.android.core import io.getstream.log.taggedLogger +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterNotNull @@ -28,9 +30,10 @@ import kotlinx.coroutines.flow.map import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeoutOrNull -import org.webrtc.PeerConnection +import org.webrtc.PeerConnection.PeerConnectionState private const val PEER_CONNECTION_OBSERVER_TIMEOUT = 5_000L +private const val INTERCEPTOR_TIMEOUT_MS = 5_000L internal class ActiveStateGate( private val coroutineScope: CoroutineScope, @@ -44,6 +47,7 @@ internal class ActiveStateGate( internal fun awaitAndTransition( currentRingingState: RingingState, call: Call, + interceptor: RingingCallJoinInterceptor, onReady: () -> Unit, ) { logger.d { "[awaitAndTransition], ringingState: $currentRingingState" } @@ -57,11 +61,7 @@ internal class ActiveStateGate( previousRingingStates.any { it is RingingState.Incoming || it is RingingState.Outgoing } if (isIncomingOrOutgoing && currentRingingState !is RingingState.Active) { - observePeerConnection( - call, - onReady, - strategy, - ) + observePeerConnection(call, interceptor, onReady, strategy) } else if (!isIncomingOrOutgoing) { onReady() } @@ -69,51 +69,67 @@ internal class ActiveStateGate( } } - private fun observePeerConnection(call: Call, onReady: () -> Unit, strategy: TransitionToRingingStateStrategy) { + private fun observePeerConnection( + call: Call, + interceptor: RingingCallJoinInterceptor, + onReady: () -> Unit, + strategy: TransitionToRingingStateStrategy, + ) { if (peerConnectionObserverJob?.isActive == true) return peerConnectionObserverJob = coroutineScope.launch { val start = System.currentTimeMillis() + val result = + withTimeoutOrNull(timeoutMs) { buildConnectionFlow(call, strategy).first() } + val duration = System.currentTimeMillis() - start - val result = withTimeoutOrNull(timeoutMs) { - call.session - .filterNotNull() - .flatMapLatest { session -> - - val publisherFlow = session.publisher - .filterNotNull() - .flatMapLatest { it.state } + logConnectionResult(result, strategy, duration) - when (strategy) { - TransitionToRingingStateStrategy.LEGACY_BEHAVIOUR -> { - emptyFlow() - .map { "none" to it } - } - TransitionToRingingStateStrategy.PUBLISHER_CONNECTED -> { - publisherFlow.filter { it == PeerConnection.PeerConnectionState.CONNECTED } - .map { "publisher" to it } - } - } - } - .first() + if (isActive) { + invokeInterceptorSafely(call, interceptor) + onReady() + cleanup() } + } + } - val duration = System.currentTimeMillis() - start - - if (result != null) { - val (source, state) = result - logger.d { - "[observeConnection-$strategy] $source reached $state in ${duration}ms" - } - } else { - logger.w { - "[observeConnection-$strategy] Timeout after ${duration}ms" + private fun buildConnectionFlow( + call: Call, + strategy: TransitionToRingingStateStrategy, + ): Flow = + call.session + .filterNotNull() + .flatMapLatest { session -> + val publisherFlow = session.publisher + .filterNotNull() + .flatMapLatest { it.state } + when (strategy) { + TransitionToRingingStateStrategy.LEGACY_BEHAVIOUR -> + emptyFlow() + TransitionToRingingStateStrategy.PUBLISHER_CONNECTED -> + publisherFlow + .filter { it == PeerConnectionState.CONNECTED } + .map { } } } - if (isActive) { - onReady() - cleanup() + + private suspend fun invokeInterceptorSafely(call: Call, interceptor: RingingCallJoinInterceptor) { + try { + withTimeoutOrNull(INTERCEPTOR_TIMEOUT_MS) { + interceptor.callReadyToJoinWithTimeout(call) } + } catch (e: CancellationException) { + throw e + } catch (e: Exception) { + logger.e(e) { "[RingingCallJoinInterceptor] interceptor threw, proceeding to Active" } + } + } + + private fun logConnectionResult(result: Unit?, strategy: TransitionToRingingStateStrategy, duration: Long) { + if (result != null) { + logger.d { "[observeConnection-$strategy] Connected in ${duration}ms" } + } else { + logger.w { "[observeConnection-$strategy] Timeout after ${duration}ms" } } } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt index e1f2e74142..db65a11a7a 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt @@ -544,6 +544,7 @@ public class Call( ring: Boolean = false, notify: Boolean = false, hintHighScaleLivestreamPublisher: Boolean? = null, + ringingCallJoinInterceptor: RingingCallJoinInterceptor = DefaultRingingCallJoinInterceptor, ): Result { logger.d { "[join] #ringing; #track; create: $create, ring: $ring, notify: $notify, createOptions: $createOptions" @@ -567,6 +568,8 @@ public class Call( // Ensure factory is created with the current audioBitrateProfile before joining ensureFactoryMatchesAudioProfile() + this.state.ringingCallJoinInterceptor = ringingCallJoinInterceptor + // the join flow should retry up to 3 times // if the error is not permanent // and fail immediately on permanent errors diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt index c9812c7ad6..ce23fab7f8 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt @@ -751,6 +751,8 @@ public class CallState( internal var incomingNotificationData = IncomingNotificationData(emptyMap()) private val ringingLogger by taggedLogger("RingingState") + internal var ringingCallJoinInterceptor: RingingCallJoinInterceptor = DefaultRingingCallJoinInterceptor + fun handleEvent(event: VideoEvent) { logger.d { "[handleEvent] ${event::class.java.name.split(".").last()}" } @@ -1381,7 +1383,11 @@ public class CallState( ringingLogger.d { "Update: $state" } if (state is RingingState.Active) { - activeStateGate.awaitAndTransition(ringingState.value, call) { + activeStateGate.awaitAndTransition( + ringingState.value, + call, + ringingCallJoinInterceptor, + ) { _ringingState.value = state activeStateGate.cleanup() } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/RingingCallJoinInterceptor.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/RingingCallJoinInterceptor.kt new file mode 100644 index 0000000000..83f1560509 --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/RingingCallJoinInterceptor.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core + +/** + * Controls when a ringing call transitions to [RingingState.Active]. + * + * Implement this to insert custom logic (e.g. waiting for user confirmation) between + * the publisher peer connection becoming ready and the call going active. + * Has no effect on non-ringing joins (livestream, direct join). + * + * @see DefaultRingingCallJoinInterceptor + */ +public interface RingingCallJoinInterceptor { + + /** + * Called when the SDK is ready to transition to [RingingState.Active]. + * Suspend here to delay the transition; return to allow it to proceed. + * + * The SDK enforces a 5-second maximum — the transition proceeds automatically on timeout. + */ + public suspend fun callReadyToJoinWithTimeout(call: Call) +} + +/** + * Default implementation that proceeds to [RingingState.Active] immediately. + */ +public object DefaultRingingCallJoinInterceptor : RingingCallJoinInterceptor { + override suspend fun callReadyToJoinWithTimeout(call: Call) {} +} diff --git a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt index 9d8b0de5b3..59b2a79b70 100644 --- a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt +++ b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt @@ -22,6 +22,7 @@ import io.getstream.video.android.core.call.connection.Subscriber import io.mockk.every import io.mockk.mockk import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.test.StandardTestDispatcher import kotlinx.coroutines.test.TestScope @@ -35,6 +36,7 @@ import org.junit.After import org.junit.Before import org.webrtc.PeerConnection import kotlin.test.Test +import kotlin.test.assertEquals import kotlin.test.assertTrue class ActiveStateGateTest { @@ -198,8 +200,8 @@ class ActiveStateGateTest { val transitioned = mutableListOf() val action = { transitioned += Unit } - sut.awaitAndTransition(incomingRingingState, call, action) - sut.awaitAndTransition(incomingRingingState, call, action) + sut.awaitAndTransition(incomingRingingState, call, onReady = action) + sut.awaitAndTransition(incomingRingingState, call, onReady = action) pubState.value = PeerConnection.PeerConnectionState.CONNECTED subState.value = PeerConnection.PeerConnectionState.CONNECTED @@ -382,4 +384,152 @@ class ActiveStateGateTest { pubState.value = PeerConnection.PeerConnectionState.CONNECTED assertTrue(transitioned.size == 1) } + + // ── RingingCallJoinInterceptor tests ────────────────────────────────────── + + @Test + fun `interceptor is called before onReady for ringing call`() = + runTest(testDispatcher) { + val pubState: MutableStateFlow = + MutableStateFlow(PeerConnection.PeerConnectionState.NEW) + val (call, _, _, _) = fakeCall(pubState) + + val interceptorCalled = mutableListOf() + val interceptor = object : RingingCallJoinInterceptor { + override suspend fun callReadyToJoinWithTimeout(call: Call) { + interceptorCalled += Unit + } + } + + val incoming = RingingState.Incoming(false) + val sut = ActiveStateGate( + coroutineScope = this, + previousRingingStates = setOf(incoming), + timeoutMs = 5_000L, + ) + val transitioned = mutableListOf() + + sut.awaitAndTransition(incoming, call, interceptor) { transitioned += Unit } + pubState.value = PeerConnection.PeerConnectionState.CONNECTED + + assertEquals(1, interceptorCalled.size) + assertEquals(1, transitioned.size) + } + + @Test + fun `delaying interceptor postpones onReady`() = + runTest(StandardTestDispatcher()) { + val pubState: MutableStateFlow = + MutableStateFlow(PeerConnection.PeerConnectionState.NEW) + val (call, _, _, _) = fakeCall(pubState) + + val interceptor = object : RingingCallJoinInterceptor { + override suspend fun callReadyToJoinWithTimeout(call: Call) { + delay(1_000L) + } + } + + val incoming = RingingState.Incoming(false) + val sut = ActiveStateGate( + coroutineScope = this, + previousRingingStates = setOf(incoming), + timeoutMs = 5_000L, + ) + val transitioned = mutableListOf() + + sut.awaitAndTransition(incoming, call, interceptor) { transitioned += Unit } + pubState.value = PeerConnection.PeerConnectionState.CONNECTED + // Advance just enough for the peer connection detection to process, + // but not enough to drain the interceptor's delay(1_000L) + advanceTimeBy(10L) + + // onReady not yet called — interceptor is still suspending + assertTrue(transitioned.isEmpty()) + + advanceTimeBy(1_100L) + assertTrue(transitioned.size == 1) + } + + @Test + fun `interceptor exceeding 5s timeout still proceeds to Active`() = + runTest(StandardTestDispatcher()) { + val pubState: MutableStateFlow = + MutableStateFlow(PeerConnection.PeerConnectionState.NEW) + val (call, _, _, _) = fakeCall(pubState) + + val interceptor = object : RingingCallJoinInterceptor { + override suspend fun callReadyToJoinWithTimeout(call: Call) { + delay(Long.MAX_VALUE) // never returns on its own + } + } + + val incoming = RingingState.Incoming(false) + val sut = ActiveStateGate( + coroutineScope = this, + previousRingingStates = setOf(incoming), + timeoutMs = 100L, + ) + val transitioned = mutableListOf() + + sut.awaitAndTransition(incoming, call, interceptor) { transitioned += Unit } + pubState.value = PeerConnection.PeerConnectionState.CONNECTED + // Advance past the 100ms peer connection timeout so the interceptor starts, + // but not past the 5s interceptor timeout + advanceTimeBy(200L) + + assertTrue(transitioned.isEmpty()) + + advanceTimeBy(5_100L) // past 5s interceptor timeout + assertTrue(transitioned.size == 1) + } + + @Test + fun `interceptor throwing exception still proceeds to Active`() = + runTest(testDispatcher) { + val pubState: MutableStateFlow = + MutableStateFlow(PeerConnection.PeerConnectionState.NEW) + val (call, _, _, _) = fakeCall(pubState) + + val interceptor = object : RingingCallJoinInterceptor { + override suspend fun callReadyToJoinWithTimeout(call: Call) { + throw RuntimeException("interceptor error") + } + } + + val incoming = RingingState.Incoming(false) + val sut = ActiveStateGate( + coroutineScope = this, + previousRingingStates = setOf(incoming), + timeoutMs = 5_000L, + ) + val transitioned = mutableListOf() + + sut.awaitAndTransition(incoming, call, interceptor) { transitioned += Unit } + pubState.value = PeerConnection.PeerConnectionState.CONNECTED + + assertEquals(1, transitioned.size) + } + + @Test + fun `interceptor is NOT called for non-ringing Active transitions`() = + runTest(testDispatcher) { + val interceptorCalled = mutableListOf() + val interceptor = object : RingingCallJoinInterceptor { + override suspend fun callReadyToJoinWithTimeout(call: Call) { + interceptorCalled += Unit + } + } + + val (call, _, _, _) = fakeCall() + val sut = ActiveStateGate( + coroutineScope = this, + previousRingingStates = emptySet(), // no incoming/outgoing history + ) + val transitioned = mutableListOf() + + sut.awaitAndTransition(RingingState.Idle, call, interceptor) { transitioned += Unit } + + assertTrue(interceptorCalled.isEmpty()) + assertEquals(1, transitioned.size) // onReady called directly + } } From 85d591297547f1afaf23c9f501edc087ddd6942d Mon Sep 17 00:00:00 2001 From: rahullohra Date: Tue, 5 May 2026 02:14:58 +0530 Subject: [PATCH 2/6] feat: remove dead code --- .../video/android/core/ActiveStateGate.kt | 58 ++++++++----------- 1 file changed, 24 insertions(+), 34 deletions(-) diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt index f10e2f17bd..9f947283b6 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt @@ -21,7 +21,6 @@ import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterNotNull import kotlinx.coroutines.flow.first @@ -51,21 +50,23 @@ internal class ActiveStateGate( onReady: () -> Unit, ) { logger.d { "[awaitAndTransition], ringingState: $currentRingingState" } - when (strategy) { - TransitionToRingingStateStrategy.LEGACY_BEHAVIOUR -> { - onReady() - } - else -> { - val isIncomingOrOutgoing = - previousRingingStates.any { it is RingingState.Incoming || it is RingingState.Outgoing } + if (strategy == TransitionToRingingStateStrategy.LEGACY_BEHAVIOUR) { + onReady() + return + } - if (isIncomingOrOutgoing && currentRingingState !is RingingState.Active) { - observePeerConnection(call, interceptor, onReady, strategy) - } else if (!isIncomingOrOutgoing) { - onReady() - } - } + val isRingingCall = previousRingingStates.any { + it is RingingState.Incoming || it is RingingState.Outgoing + } + + when { + !isRingingCall -> onReady() + currentRingingState !is RingingState.Active -> observePeerConnection( + call, + interceptor, + onReady, + ) } } @@ -73,17 +74,15 @@ internal class ActiveStateGate( call: Call, interceptor: RingingCallJoinInterceptor, onReady: () -> Unit, - strategy: TransitionToRingingStateStrategy, ) { if (peerConnectionObserverJob?.isActive == true) return peerConnectionObserverJob = coroutineScope.launch { val start = System.currentTimeMillis() - val result = - withTimeoutOrNull(timeoutMs) { buildConnectionFlow(call, strategy).first() } + val result = withTimeoutOrNull(timeoutMs) { buildConnectionFlow(call).first() } val duration = System.currentTimeMillis() - start - logConnectionResult(result, strategy, duration) + logConnectionResult(result, duration) if (isActive) { invokeInterceptorSafely(call, interceptor) @@ -93,24 +92,15 @@ internal class ActiveStateGate( } } - private fun buildConnectionFlow( - call: Call, - strategy: TransitionToRingingStateStrategy, - ): Flow = + private fun buildConnectionFlow(call: Call): Flow = call.session .filterNotNull() .flatMapLatest { session -> - val publisherFlow = session.publisher + session.publisher .filterNotNull() .flatMapLatest { it.state } - when (strategy) { - TransitionToRingingStateStrategy.LEGACY_BEHAVIOUR -> - emptyFlow() - TransitionToRingingStateStrategy.PUBLISHER_CONNECTED -> - publisherFlow - .filter { it == PeerConnectionState.CONNECTED } - .map { } - } + .filter { it == PeerConnectionState.CONNECTED } + .map { } } private suspend fun invokeInterceptorSafely(call: Call, interceptor: RingingCallJoinInterceptor) { @@ -125,11 +115,11 @@ internal class ActiveStateGate( } } - private fun logConnectionResult(result: Unit?, strategy: TransitionToRingingStateStrategy, duration: Long) { + private fun logConnectionResult(result: Unit?, duration: Long) { if (result != null) { - logger.d { "[observeConnection-$strategy] Connected in ${duration}ms" } + logger.d { "[observeConnection] Connected in ${duration}ms" } } else { - logger.w { "[observeConnection-$strategy] Timeout after ${duration}ms" } + logger.w { "[observeConnection] Timeout after ${duration}ms" } } } From a0af1257c5d2273f402393a05f7d85586276a9c8 Mon Sep 17 00:00:00 2001 From: rahullohra Date: Tue, 5 May 2026 11:20:07 +0530 Subject: [PATCH 3/6] feat: update test cases --- .../video/android/core/ActiveStateGateTest.kt | 135 ++++++++---------- 1 file changed, 58 insertions(+), 77 deletions(-) diff --git a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt index 59b2a79b70..7ce9ee85c1 100644 --- a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt +++ b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt @@ -18,7 +18,6 @@ package io.getstream.video.android.core import io.getstream.video.android.core.call.RtcSession import io.getstream.video.android.core.call.connection.Publisher -import io.getstream.video.android.core.call.connection.Subscriber import io.mockk.every import io.mockk.mockk import kotlinx.coroutines.Dispatchers @@ -52,36 +51,25 @@ class ActiveStateGateTest { Dispatchers.resetMain() } - // ── TestData now exposes firstRtpPacketArrived ──────────────────────────── - private data class TestData( val call: Call, val publisherState: MutableStateFlow, - val subscriberState: MutableStateFlow, - val firstRtpPacketArrived: MutableStateFlow, ) private fun fakeCall( publisherState: MutableStateFlow = MutableStateFlow(PeerConnection.PeerConnectionState.NEW), - subscriberState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW), - firstRtpPacketArrived: MutableStateFlow = MutableStateFlow(false), ): TestData { val publisher = mockk(relaxed = true) { every { state } returns publisherState } - val subscriber = mockk(relaxed = true) { - every { state } returns subscriberState - } val session = mockk { every { this@mockk.publisher } returns MutableStateFlow(publisher) - every { this@mockk.subscriber } returns MutableStateFlow(subscriber) } val call = mockk { every { this@mockk.session } returns MutableStateFlow(session) } - return TestData(call, publisherState, subscriberState, firstRtpPacketArrived) + return TestData(call, publisherState) } // ── 1. Non-ringing previous states ──────────────────────────────────────── @@ -93,12 +81,13 @@ class ActiveStateGateTest { coroutineScope = this, previousRingingStates = emptySet(), ) - val (call, _, _, _) = fakeCall() + val (call, _) = fakeCall() val transitioned = mutableListOf() sut.awaitAndTransition( currentRingingState = RingingState.Idle, call = call, + interceptor = DefaultRingingCallJoinInterceptor, onReady = { transitioned += Unit }, ) @@ -114,12 +103,13 @@ class ActiveStateGateTest { coroutineScope = this, previousRingingStates = setOf(RingingState.Incoming(false)), ) - val (call, _, _, _) = fakeCall() + val (call, _) = fakeCall() val transitioned = mutableListOf() sut.awaitAndTransition( currentRingingState = RingingState.Active, call = call, + interceptor = DefaultRingingCallJoinInterceptor, onReady = { transitioned += Unit }, ) @@ -127,13 +117,11 @@ class ActiveStateGateTest { } @Test - fun `transitions when both peers connect for Outgoing previous state`() = + fun `transitions when publisher connects for Outgoing previous state`() = runTest(testDispatcher) { val pubState: MutableStateFlow = MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val subState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val (call, _, _, _) = fakeCall(pubState, subState) + val (call, _) = fakeCall(pubState) val outgoingRingingState = RingingState.Outgoing(false) val sut = ActiveStateGate( @@ -146,20 +134,20 @@ class ActiveStateGateTest { sut.awaitAndTransition( currentRingingState = outgoingRingingState, call = call, + interceptor = DefaultRingingCallJoinInterceptor, onReady = { transitioned += Unit }, ) pubState.value = PeerConnection.PeerConnectionState.CONNECTED - subState.value = PeerConnection.PeerConnectionState.CONNECTED assertTrue(transitioned.size == 1) } // ── 4. Timeout ──────────────────────────────────────────────────────────── @Test - fun `still calls onReady after timeout even if peers never connect`() = + fun `still calls onReady after timeout even if publisher never connects`() = runTest(testDispatcher) { - val (call, _, _, _) = fakeCall() + val (call, _) = fakeCall() val incomingRingingState = RingingState.Incoming(false) val sut = ActiveStateGate( @@ -172,6 +160,7 @@ class ActiveStateGateTest { sut.awaitAndTransition( currentRingingState = incomingRingingState, call = call, + interceptor = DefaultRingingCallJoinInterceptor, onReady = { transitioned += Unit }, ) @@ -187,9 +176,7 @@ class ActiveStateGateTest { runTest(testDispatcher) { val pubState: MutableStateFlow = MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val subState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val (call, _, _, _) = fakeCall(pubState, subState) + val (call, _) = fakeCall(pubState) val incomingRingingState = RingingState.Incoming(false) val sut = ActiveStateGate( @@ -200,11 +187,20 @@ class ActiveStateGateTest { val transitioned = mutableListOf() val action = { transitioned += Unit } - sut.awaitAndTransition(incomingRingingState, call, onReady = action) - sut.awaitAndTransition(incomingRingingState, call, onReady = action) + sut.awaitAndTransition( + incomingRingingState, + call, + DefaultRingingCallJoinInterceptor, + onReady = action, + ) + sut.awaitAndTransition( + incomingRingingState, + call, + DefaultRingingCallJoinInterceptor, + onReady = action, + ) pubState.value = PeerConnection.PeerConnectionState.CONNECTED - subState.value = PeerConnection.PeerConnectionState.CONNECTED assertTrue(transitioned.size == 1) } @@ -214,7 +210,7 @@ class ActiveStateGateTest { @Test fun `cleanup cancels the observer job and onReady is never called`() = runTest(testDispatcher) { - val (call, pubState, subState, _) = fakeCall() + val (call, pubState) = fakeCall() val incomingRingingState = RingingState.Incoming(false) val sut = ActiveStateGate( @@ -227,13 +223,12 @@ class ActiveStateGateTest { sut.awaitAndTransition( currentRingingState = incomingRingingState, call = call, + interceptor = DefaultRingingCallJoinInterceptor, onReady = { transitioned += Unit }, ) sut.cleanup() - pubState.value = PeerConnection.PeerConnectionState.CONNECTED - subState.value = PeerConnection.PeerConnectionState.CONNECTED assertTrue(transitioned.isEmpty()) } @@ -243,9 +238,7 @@ class ActiveStateGateTest { runTest(testDispatcher) { val pubState: MutableStateFlow = MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val subState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val (call, _, _, _) = fakeCall(pubState, subState) + val (call, _) = fakeCall(pubState) val incomingRingingState = RingingState.Incoming(false) val sut = ActiveStateGate( @@ -255,12 +248,15 @@ class ActiveStateGateTest { ) val transitioned = mutableListOf() - sut.awaitAndTransition(incomingRingingState, call) { transitioned += Unit } + sut.awaitAndTransition(incomingRingingState, call, DefaultRingingCallJoinInterceptor) { + transitioned += Unit + } sut.cleanup() - sut.awaitAndTransition(incomingRingingState, call) { transitioned += Unit } + sut.awaitAndTransition(incomingRingingState, call, DefaultRingingCallJoinInterceptor) { + transitioned += Unit + } pubState.value = PeerConnection.PeerConnectionState.CONNECTED - subState.value = PeerConnection.PeerConnectionState.CONNECTED assertTrue(transitioned.size == 1) } @@ -273,14 +269,10 @@ class ActiveStateGateTest { val sessionFlow = MutableStateFlow(null) val pubState: MutableStateFlow = MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val subState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW) val publisher = mockk { every { state } returns pubState } - val subscriber = mockk(relaxed = true) { every { state } returns subState } val session = mockk { every { this@mockk.publisher } returns MutableStateFlow(publisher) - every { this@mockk.subscriber } returns MutableStateFlow(subscriber) } val call = mockk { every { this@mockk.session } returns sessionFlow } @@ -292,10 +284,11 @@ class ActiveStateGateTest { ) val transitioned = mutableListOf() - sut.awaitAndTransition(incomingRingingState, call) { transitioned += Unit } + sut.awaitAndTransition(incomingRingingState, call, DefaultRingingCallJoinInterceptor) { + transitioned += Unit + } pubState.value = PeerConnection.PeerConnectionState.CONNECTED - subState.value = PeerConnection.PeerConnectionState.CONNECTED assertTrue(transitioned.isEmpty()) sessionFlow.value = session @@ -309,9 +302,7 @@ class ActiveStateGateTest { runTest(StandardTestDispatcher()) { val pubState: MutableStateFlow = MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val subState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val (call, _, _, _) = fakeCall(pubState, subState) + val (call, _) = fakeCall(pubState) val incomingRingingState = RingingState.Incoming(false) val sut = ActiveStateGate( @@ -321,14 +312,13 @@ class ActiveStateGateTest { ) val transitioned = mutableListOf() - sut.awaitAndTransition(incomingRingingState, call) { transitioned += Unit } + sut.awaitAndTransition(incomingRingingState, call, DefaultRingingCallJoinInterceptor) { + transitioned += Unit + } - // Peers connect but cleanup cancels before the coroutine resumes pubState.value = PeerConnection.PeerConnectionState.CONNECTED - subState.value = PeerConnection.PeerConnectionState.CONNECTED - sut.cleanup() // cancels before isActive check runs + sut.cleanup() - // Now let any pending coroutine work drain — onReady must not fire advanceUntilIdle() assertTrue(transitioned.isEmpty()) @@ -340,19 +330,14 @@ class ActiveStateGateTest { strategy: TransitionToRingingStateStrategy, pubState: MutableStateFlow = MutableStateFlow(PeerConnection.PeerConnectionState.NEW), - subState: MutableStateFlow = - MutableStateFlow(PeerConnection.PeerConnectionState.NEW), - firstRtpPacketArrived: MutableStateFlow = MutableStateFlow(false), block: suspend TestScope.( call: Call, sut: ActiveStateGate, transitioned: MutableList, pubState: MutableStateFlow, - subState: MutableStateFlow, - firstRtpPacketArrived: MutableStateFlow, ) -> Unit, ) = runTest(testDispatcher) { - val testData = fakeCall(pubState, subState, firstRtpPacketArrived) + val testData = fakeCall(pubState) val incoming = RingingState.Incoming(false) val sut = ActiveStateGate( coroutineScope = this, @@ -361,24 +346,25 @@ class ActiveStateGateTest { timeoutMs = 5_000L, ) val transitioned = mutableListOf() - sut.awaitAndTransition(incoming, testData.call) { transitioned += Unit } - block(testData.call, sut, transitioned, pubState, subState, firstRtpPacketArrived) + sut.awaitAndTransition(incoming, testData.call, DefaultRingingCallJoinInterceptor) { + transitioned += Unit + } + block(testData.call, sut, transitioned, pubState) } @Test fun `LEGACY_BEHAVIOUR – transition still fires without timeout fallback`() = runStrategyTest( TransitionToRingingStateStrategy.LEGACY_BEHAVIOUR, - ) { _, _, transitioned, _, _, _ -> + ) { _, _, transitioned, _ -> assertTrue(transitioned.size == 1) } @Test - fun `PUBLISHER_CONNECTED – subscriber alone is not enough`() = + fun `PUBLISHER_CONNECTED – transitions when publisher connects`() = runStrategyTest( TransitionToRingingStateStrategy.PUBLISHER_CONNECTED, - ) { _, _, transitioned, pubState, subState, _ -> - subState.value = PeerConnection.PeerConnectionState.CONNECTED + ) { _, _, transitioned, pubState -> assertTrue(transitioned.isEmpty()) pubState.value = PeerConnection.PeerConnectionState.CONNECTED @@ -392,7 +378,7 @@ class ActiveStateGateTest { runTest(testDispatcher) { val pubState: MutableStateFlow = MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val (call, _, _, _) = fakeCall(pubState) + val (call, _) = fakeCall(pubState) val interceptorCalled = mutableListOf() val interceptor = object : RingingCallJoinInterceptor { @@ -421,7 +407,7 @@ class ActiveStateGateTest { runTest(StandardTestDispatcher()) { val pubState: MutableStateFlow = MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val (call, _, _, _) = fakeCall(pubState) + val (call, _) = fakeCall(pubState) val interceptor = object : RingingCallJoinInterceptor { override suspend fun callReadyToJoinWithTimeout(call: Call) { @@ -439,11 +425,8 @@ class ActiveStateGateTest { sut.awaitAndTransition(incoming, call, interceptor) { transitioned += Unit } pubState.value = PeerConnection.PeerConnectionState.CONNECTED - // Advance just enough for the peer connection detection to process, - // but not enough to drain the interceptor's delay(1_000L) advanceTimeBy(10L) - // onReady not yet called — interceptor is still suspending assertTrue(transitioned.isEmpty()) advanceTimeBy(1_100L) @@ -455,11 +438,11 @@ class ActiveStateGateTest { runTest(StandardTestDispatcher()) { val pubState: MutableStateFlow = MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val (call, _, _, _) = fakeCall(pubState) + val (call, _) = fakeCall(pubState) val interceptor = object : RingingCallJoinInterceptor { override suspend fun callReadyToJoinWithTimeout(call: Call) { - delay(Long.MAX_VALUE) // never returns on its own + delay(Long.MAX_VALUE) } } @@ -473,13 +456,11 @@ class ActiveStateGateTest { sut.awaitAndTransition(incoming, call, interceptor) { transitioned += Unit } pubState.value = PeerConnection.PeerConnectionState.CONNECTED - // Advance past the 100ms peer connection timeout so the interceptor starts, - // but not past the 5s interceptor timeout advanceTimeBy(200L) assertTrue(transitioned.isEmpty()) - advanceTimeBy(5_100L) // past 5s interceptor timeout + advanceTimeBy(5_100L) assertTrue(transitioned.size == 1) } @@ -488,7 +469,7 @@ class ActiveStateGateTest { runTest(testDispatcher) { val pubState: MutableStateFlow = MutableStateFlow(PeerConnection.PeerConnectionState.NEW) - val (call, _, _, _) = fakeCall(pubState) + val (call, _) = fakeCall(pubState) val interceptor = object : RingingCallJoinInterceptor { override suspend fun callReadyToJoinWithTimeout(call: Call) { @@ -520,16 +501,16 @@ class ActiveStateGateTest { } } - val (call, _, _, _) = fakeCall() + val (call, _) = fakeCall() val sut = ActiveStateGate( coroutineScope = this, - previousRingingStates = emptySet(), // no incoming/outgoing history + previousRingingStates = emptySet(), ) val transitioned = mutableListOf() sut.awaitAndTransition(RingingState.Idle, call, interceptor) { transitioned += Unit } assertTrue(interceptorCalled.isEmpty()) - assertEquals(1, transitioned.size) // onReady called directly + assertEquals(1, transitioned.size) } } From 8eeb149ba34571eca478ea31768362d1674ce453 Mon Sep 17 00:00:00 2001 From: rahullohra Date: Tue, 5 May 2026 14:18:41 +0530 Subject: [PATCH 4/6] feat: update logic to run in parallel --- .../video/android/core/ActiveStateGate.kt | 14 ++++++--- .../video/android/core/ActiveStateGateTest.kt | 30 +++++++++++++++++++ 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt index 9f947283b6..dc09ed63ce 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt @@ -20,6 +20,7 @@ import io.getstream.log.taggedLogger import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job +import kotlinx.coroutines.async import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterNotNull @@ -79,13 +80,18 @@ internal class ActiveStateGate( peerConnectionObserverJob = coroutineScope.launch { val start = System.currentTimeMillis() - val result = withTimeoutOrNull(timeoutMs) { buildConnectionFlow(call).first() } - val duration = System.currentTimeMillis() - start - logConnectionResult(result, duration) + val peerWait = async { + withTimeoutOrNull(timeoutMs) { buildConnectionFlow(call).first() } + } + val interceptorWait = async { + invokeInterceptorSafely(call, interceptor) + } + val peerResult = peerWait.await() + interceptorWait.await() + logConnectionResult(peerResult, System.currentTimeMillis() - start) if (isActive) { - invokeInterceptorSafely(call, interceptor) onReady() cleanup() } diff --git a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt index 7ce9ee85c1..4a7c17b2f4 100644 --- a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt +++ b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt @@ -491,6 +491,36 @@ class ActiveStateGateTest { assertEquals(1, transitioned.size) } + @Test + fun `interceptor and publisher wait run in parallel`() = + runTest(StandardTestDispatcher()) { + val pubState: MutableStateFlow = + MutableStateFlow(PeerConnection.PeerConnectionState.NEW) + val (call, _) = fakeCall(pubState) + + val interceptor = object : RingingCallJoinInterceptor { + override suspend fun callReadyToJoinWithTimeout(call: Call) { + delay(2_000L) + } + } + + val incoming = RingingState.Incoming(false) + val sut = ActiveStateGate( + coroutineScope = this, + previousRingingStates = setOf(incoming), + timeoutMs = 5_000L, + ) + val transitioned = mutableListOf() + + sut.awaitAndTransition(incoming, call, interceptor) { transitioned += Unit } + + advanceTimeBy(2_000L) + pubState.value = PeerConnection.PeerConnectionState.CONNECTED + advanceTimeBy(50L) + + assertEquals(1, transitioned.size) + } + @Test fun `interceptor is NOT called for non-ringing Active transitions`() = runTest(testDispatcher) { From eda3192071cd6bb08a15ecf108d7885735037a3e Mon Sep 17 00:00:00 2001 From: rahullohra Date: Tue, 5 May 2026 14:52:38 +0530 Subject: [PATCH 5/6] feat: remove DefaultRingingCallJoinInterceptor --- .../api/stream-video-android-core.api | 5 ---- .../video/android/core/ActiveStateGate.kt | 10 ++++---- .../io/getstream/video/android/core/Call.kt | 2 +- .../getstream/video/android/core/CallState.kt | 2 +- .../core/RingingCallJoinInterceptor.kt | 9 ------- .../video/android/core/ActiveStateGateTest.kt | 24 +++++++++---------- 6 files changed, 19 insertions(+), 33 deletions(-) diff --git a/stream-video-android-core/api/stream-video-android-core.api b/stream-video-android-core/api/stream-video-android-core.api index 53ee06b506..0ec86b2729 100644 --- a/stream-video-android-core/api/stream-video-android-core.api +++ b/stream-video-android-core/api/stream-video-android-core.api @@ -8951,11 +8951,6 @@ public final class io/getstream/video/android/core/CreateCallOptions { public fun toString ()Ljava/lang/String; } -public final class io/getstream/video/android/core/DefaultRingingCallJoinInterceptor : io/getstream/video/android/core/RingingCallJoinInterceptor { - public static final field INSTANCE Lio/getstream/video/android/core/DefaultRingingCallJoinInterceptor; - public fun callReadyToJoinWithTimeout (Lio/getstream/video/android/core/Call;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; -} - public abstract class io/getstream/video/android/core/DeviceStatus { } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt index dc09ed63ce..a8d177d220 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt @@ -47,7 +47,7 @@ internal class ActiveStateGate( internal fun awaitAndTransition( currentRingingState: RingingState, call: Call, - interceptor: RingingCallJoinInterceptor, + interceptor: RingingCallJoinInterceptor?, onReady: () -> Unit, ) { logger.d { "[awaitAndTransition], ringingState: $currentRingingState" } @@ -73,7 +73,7 @@ internal class ActiveStateGate( private fun observePeerConnection( call: Call, - interceptor: RingingCallJoinInterceptor, + interceptor: RingingCallJoinInterceptor?, onReady: () -> Unit, ) { if (peerConnectionObserverJob?.isActive == true) return @@ -84,11 +84,11 @@ internal class ActiveStateGate( val peerWait = async { withTimeoutOrNull(timeoutMs) { buildConnectionFlow(call).first() } } - val interceptorWait = async { - invokeInterceptorSafely(call, interceptor) + val interceptorWait = interceptor?.let { + async { invokeInterceptorSafely(call, it) } } val peerResult = peerWait.await() - interceptorWait.await() + interceptorWait?.await() logConnectionResult(peerResult, System.currentTimeMillis() - start) if (isActive) { diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt index db65a11a7a..0fcbc0f04c 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt @@ -544,7 +544,7 @@ public class Call( ring: Boolean = false, notify: Boolean = false, hintHighScaleLivestreamPublisher: Boolean? = null, - ringingCallJoinInterceptor: RingingCallJoinInterceptor = DefaultRingingCallJoinInterceptor, + ringingCallJoinInterceptor: RingingCallJoinInterceptor? = null, ): Result { logger.d { "[join] #ringing; #track; create: $create, ring: $ring, notify: $notify, createOptions: $createOptions" diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt index ce23fab7f8..0cdb1a516a 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt @@ -751,7 +751,7 @@ public class CallState( internal var incomingNotificationData = IncomingNotificationData(emptyMap()) private val ringingLogger by taggedLogger("RingingState") - internal var ringingCallJoinInterceptor: RingingCallJoinInterceptor = DefaultRingingCallJoinInterceptor + internal var ringingCallJoinInterceptor: RingingCallJoinInterceptor? = null fun handleEvent(event: VideoEvent) { logger.d { "[handleEvent] ${event::class.java.name.split(".").last()}" } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/RingingCallJoinInterceptor.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/RingingCallJoinInterceptor.kt index 83f1560509..cf3c090b13 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/RingingCallJoinInterceptor.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/RingingCallJoinInterceptor.kt @@ -22,8 +22,6 @@ package io.getstream.video.android.core * Implement this to insert custom logic (e.g. waiting for user confirmation) between * the publisher peer connection becoming ready and the call going active. * Has no effect on non-ringing joins (livestream, direct join). - * - * @see DefaultRingingCallJoinInterceptor */ public interface RingingCallJoinInterceptor { @@ -35,10 +33,3 @@ public interface RingingCallJoinInterceptor { */ public suspend fun callReadyToJoinWithTimeout(call: Call) } - -/** - * Default implementation that proceeds to [RingingState.Active] immediately. - */ -public object DefaultRingingCallJoinInterceptor : RingingCallJoinInterceptor { - override suspend fun callReadyToJoinWithTimeout(call: Call) {} -} diff --git a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt index 4a7c17b2f4..878ac9672a 100644 --- a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt +++ b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt @@ -87,7 +87,7 @@ class ActiveStateGateTest { sut.awaitAndTransition( currentRingingState = RingingState.Idle, call = call, - interceptor = DefaultRingingCallJoinInterceptor, + interceptor = null, onReady = { transitioned += Unit }, ) @@ -109,7 +109,7 @@ class ActiveStateGateTest { sut.awaitAndTransition( currentRingingState = RingingState.Active, call = call, - interceptor = DefaultRingingCallJoinInterceptor, + interceptor = null, onReady = { transitioned += Unit }, ) @@ -134,7 +134,7 @@ class ActiveStateGateTest { sut.awaitAndTransition( currentRingingState = outgoingRingingState, call = call, - interceptor = DefaultRingingCallJoinInterceptor, + interceptor = null, onReady = { transitioned += Unit }, ) @@ -160,7 +160,7 @@ class ActiveStateGateTest { sut.awaitAndTransition( currentRingingState = incomingRingingState, call = call, - interceptor = DefaultRingingCallJoinInterceptor, + interceptor = null, onReady = { transitioned += Unit }, ) @@ -190,13 +190,13 @@ class ActiveStateGateTest { sut.awaitAndTransition( incomingRingingState, call, - DefaultRingingCallJoinInterceptor, + null, onReady = action, ) sut.awaitAndTransition( incomingRingingState, call, - DefaultRingingCallJoinInterceptor, + null, onReady = action, ) @@ -223,7 +223,7 @@ class ActiveStateGateTest { sut.awaitAndTransition( currentRingingState = incomingRingingState, call = call, - interceptor = DefaultRingingCallJoinInterceptor, + interceptor = null, onReady = { transitioned += Unit }, ) @@ -248,11 +248,11 @@ class ActiveStateGateTest { ) val transitioned = mutableListOf() - sut.awaitAndTransition(incomingRingingState, call, DefaultRingingCallJoinInterceptor) { + sut.awaitAndTransition(incomingRingingState, call, null) { transitioned += Unit } sut.cleanup() - sut.awaitAndTransition(incomingRingingState, call, DefaultRingingCallJoinInterceptor) { + sut.awaitAndTransition(incomingRingingState, call, null) { transitioned += Unit } @@ -284,7 +284,7 @@ class ActiveStateGateTest { ) val transitioned = mutableListOf() - sut.awaitAndTransition(incomingRingingState, call, DefaultRingingCallJoinInterceptor) { + sut.awaitAndTransition(incomingRingingState, call, null) { transitioned += Unit } @@ -312,7 +312,7 @@ class ActiveStateGateTest { ) val transitioned = mutableListOf() - sut.awaitAndTransition(incomingRingingState, call, DefaultRingingCallJoinInterceptor) { + sut.awaitAndTransition(incomingRingingState, call, null) { transitioned += Unit } @@ -346,7 +346,7 @@ class ActiveStateGateTest { timeoutMs = 5_000L, ) val transitioned = mutableListOf() - sut.awaitAndTransition(incoming, testData.call, DefaultRingingCallJoinInterceptor) { + sut.awaitAndTransition(incoming, testData.call, null) { transitioned += Unit } block(testData.call, sut, transitioned, pubState) From 568fc81e22c422b0a4a036502ca4529f661ba96c Mon Sep 17 00:00:00 2001 From: rahullohra Date: Tue, 5 May 2026 16:38:52 +0530 Subject: [PATCH 6/6] chore: refactor the interface name --- .../api/stream-video-android-core.api | 8 +++--- .../video/android/core/ActiveStateGate.kt | 12 +++++---- .../io/getstream/video/android/core/Call.kt | 4 +-- .../getstream/video/android/core/CallState.kt | 4 +-- ...kt => RingingCallActivationInterceptor.kt} | 4 +-- .../video/android/core/ActiveStateGateTest.kt | 26 +++++++++---------- 6 files changed, 30 insertions(+), 28 deletions(-) rename stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/{RingingCallJoinInterceptor.kt => RingingCallActivationInterceptor.kt} (91%) diff --git a/stream-video-android-core/api/stream-video-android-core.api b/stream-video-android-core/api/stream-video-android-core.api index 0ec86b2729..c20fd132fa 100644 --- a/stream-video-android-core/api/stream-video-android-core.api +++ b/stream-video-android-core/api/stream-video-android-core.api @@ -8593,8 +8593,8 @@ public final class io/getstream/video/android/core/Call { public final fun isPinnedParticipant (Ljava/lang/String;)Z public final fun isServerPin (Ljava/lang/String;)Z public final fun isVideoEnabled ()Z - public final fun join (ZLio/getstream/video/android/core/CreateCallOptions;ZZLjava/lang/Boolean;Lio/getstream/video/android/core/RingingCallJoinInterceptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static synthetic fun join$default (Lio/getstream/video/android/core/Call;ZLio/getstream/video/android/core/CreateCallOptions;ZZLjava/lang/Boolean;Lio/getstream/video/android/core/RingingCallJoinInterceptor;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public final fun join (ZLio/getstream/video/android/core/CreateCallOptions;ZZLjava/lang/Boolean;Lio/getstream/video/android/core/RingingCallActivationInterceptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun join$default (Lio/getstream/video/android/core/Call;ZLio/getstream/video/android/core/CreateCallOptions;ZZLjava/lang/Boolean;Lio/getstream/video/android/core/RingingCallActivationInterceptor;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public final fun joinAndRing (Ljava/util/List;Lio/getstream/video/android/core/CreateCallOptions;ZLkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun joinAndRing$default (Lio/getstream/video/android/core/Call;Ljava/util/List;Lio/getstream/video/android/core/CreateCallOptions;ZLkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public final fun kickUser (Ljava/lang/String;ZLkotlin/coroutines/Continuation;)Ljava/lang/Object; @@ -9317,8 +9317,8 @@ public final class io/getstream/video/android/core/RealtimeConnection$Reconnecti public fun toString ()Ljava/lang/String; } -public abstract interface class io/getstream/video/android/core/RingingCallJoinInterceptor { - public abstract fun callReadyToJoinWithTimeout (Lio/getstream/video/android/core/Call;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +public abstract interface class io/getstream/video/android/core/RingingCallActivationInterceptor { + public abstract fun callReadyToActivateWithTimeout (Lio/getstream/video/android/core/Call;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public abstract interface class io/getstream/video/android/core/RingingState { diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt index a8d177d220..044a197d17 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ActiveStateGate.kt @@ -47,7 +47,7 @@ internal class ActiveStateGate( internal fun awaitAndTransition( currentRingingState: RingingState, call: Call, - interceptor: RingingCallJoinInterceptor?, + interceptor: RingingCallActivationInterceptor?, onReady: () -> Unit, ) { logger.d { "[awaitAndTransition], ringingState: $currentRingingState" } @@ -73,7 +73,7 @@ internal class ActiveStateGate( private fun observePeerConnection( call: Call, - interceptor: RingingCallJoinInterceptor?, + interceptor: RingingCallActivationInterceptor?, onReady: () -> Unit, ) { if (peerConnectionObserverJob?.isActive == true) return @@ -109,15 +109,17 @@ internal class ActiveStateGate( .map { } } - private suspend fun invokeInterceptorSafely(call: Call, interceptor: RingingCallJoinInterceptor) { + private suspend fun invokeInterceptorSafely(call: Call, interceptor: RingingCallActivationInterceptor) { try { withTimeoutOrNull(INTERCEPTOR_TIMEOUT_MS) { - interceptor.callReadyToJoinWithTimeout(call) + interceptor.callReadyToActivateWithTimeout(call) } } catch (e: CancellationException) { throw e } catch (e: Exception) { - logger.e(e) { "[RingingCallJoinInterceptor] interceptor threw, proceeding to Active" } + logger.e( + e, + ) { "[RingingCallActivationInterceptor] interceptor threw, proceeding to Active" } } } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt index 0fcbc0f04c..18838b9892 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt @@ -544,7 +544,7 @@ public class Call( ring: Boolean = false, notify: Boolean = false, hintHighScaleLivestreamPublisher: Boolean? = null, - ringingCallJoinInterceptor: RingingCallJoinInterceptor? = null, + ringingCallActivationInterceptor: RingingCallActivationInterceptor? = null, ): Result { logger.d { "[join] #ringing; #track; create: $create, ring: $ring, notify: $notify, createOptions: $createOptions" @@ -568,7 +568,7 @@ public class Call( // Ensure factory is created with the current audioBitrateProfile before joining ensureFactoryMatchesAudioProfile() - this.state.ringingCallJoinInterceptor = ringingCallJoinInterceptor + this.state.ringingCallActivationInterceptor = ringingCallActivationInterceptor // the join flow should retry up to 3 times // if the error is not permanent diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt index 0cdb1a516a..1e9b73560e 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt @@ -751,7 +751,7 @@ public class CallState( internal var incomingNotificationData = IncomingNotificationData(emptyMap()) private val ringingLogger by taggedLogger("RingingState") - internal var ringingCallJoinInterceptor: RingingCallJoinInterceptor? = null + internal var ringingCallActivationInterceptor: RingingCallActivationInterceptor? = null fun handleEvent(event: VideoEvent) { logger.d { "[handleEvent] ${event::class.java.name.split(".").last()}" } @@ -1386,7 +1386,7 @@ public class CallState( activeStateGate.awaitAndTransition( ringingState.value, call, - ringingCallJoinInterceptor, + ringingCallActivationInterceptor, ) { _ringingState.value = state activeStateGate.cleanup() diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/RingingCallJoinInterceptor.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/RingingCallActivationInterceptor.kt similarity index 91% rename from stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/RingingCallJoinInterceptor.kt rename to stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/RingingCallActivationInterceptor.kt index cf3c090b13..9ed934531a 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/RingingCallJoinInterceptor.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/RingingCallActivationInterceptor.kt @@ -23,7 +23,7 @@ package io.getstream.video.android.core * the publisher peer connection becoming ready and the call going active. * Has no effect on non-ringing joins (livestream, direct join). */ -public interface RingingCallJoinInterceptor { +public interface RingingCallActivationInterceptor { /** * Called when the SDK is ready to transition to [RingingState.Active]. @@ -31,5 +31,5 @@ public interface RingingCallJoinInterceptor { * * The SDK enforces a 5-second maximum — the transition proceeds automatically on timeout. */ - public suspend fun callReadyToJoinWithTimeout(call: Call) + public suspend fun callReadyToActivateWithTimeout(call: Call) } diff --git a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt index 878ac9672a..fd411a7555 100644 --- a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt +++ b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/ActiveStateGateTest.kt @@ -371,7 +371,7 @@ class ActiveStateGateTest { assertTrue(transitioned.size == 1) } - // ── RingingCallJoinInterceptor tests ────────────────────────────────────── + // ── RingingCallActivationInterceptor tests ────────────────────────────────────── @Test fun `interceptor is called before onReady for ringing call`() = @@ -381,8 +381,8 @@ class ActiveStateGateTest { val (call, _) = fakeCall(pubState) val interceptorCalled = mutableListOf() - val interceptor = object : RingingCallJoinInterceptor { - override suspend fun callReadyToJoinWithTimeout(call: Call) { + val interceptor = object : RingingCallActivationInterceptor { + override suspend fun callReadyToActivateWithTimeout(call: Call) { interceptorCalled += Unit } } @@ -409,8 +409,8 @@ class ActiveStateGateTest { MutableStateFlow(PeerConnection.PeerConnectionState.NEW) val (call, _) = fakeCall(pubState) - val interceptor = object : RingingCallJoinInterceptor { - override suspend fun callReadyToJoinWithTimeout(call: Call) { + val interceptor = object : RingingCallActivationInterceptor { + override suspend fun callReadyToActivateWithTimeout(call: Call) { delay(1_000L) } } @@ -440,8 +440,8 @@ class ActiveStateGateTest { MutableStateFlow(PeerConnection.PeerConnectionState.NEW) val (call, _) = fakeCall(pubState) - val interceptor = object : RingingCallJoinInterceptor { - override suspend fun callReadyToJoinWithTimeout(call: Call) { + val interceptor = object : RingingCallActivationInterceptor { + override suspend fun callReadyToActivateWithTimeout(call: Call) { delay(Long.MAX_VALUE) } } @@ -471,8 +471,8 @@ class ActiveStateGateTest { MutableStateFlow(PeerConnection.PeerConnectionState.NEW) val (call, _) = fakeCall(pubState) - val interceptor = object : RingingCallJoinInterceptor { - override suspend fun callReadyToJoinWithTimeout(call: Call) { + val interceptor = object : RingingCallActivationInterceptor { + override suspend fun callReadyToActivateWithTimeout(call: Call) { throw RuntimeException("interceptor error") } } @@ -498,8 +498,8 @@ class ActiveStateGateTest { MutableStateFlow(PeerConnection.PeerConnectionState.NEW) val (call, _) = fakeCall(pubState) - val interceptor = object : RingingCallJoinInterceptor { - override suspend fun callReadyToJoinWithTimeout(call: Call) { + val interceptor = object : RingingCallActivationInterceptor { + override suspend fun callReadyToActivateWithTimeout(call: Call) { delay(2_000L) } } @@ -525,8 +525,8 @@ class ActiveStateGateTest { fun `interceptor is NOT called for non-ringing Active transitions`() = runTest(testDispatcher) { val interceptorCalled = mutableListOf() - val interceptor = object : RingingCallJoinInterceptor { - override suspend fun callReadyToJoinWithTimeout(call: Call) { + val interceptor = object : RingingCallActivationInterceptor { + override suspend fun callReadyToActivateWithTimeout(call: Call) { interceptorCalled += Unit } }