fix(server): evict stale GET SSE stream mapping on reconnect#716
fix(server): evict stale GET SSE stream mapping on reconnect#716amr wants to merge 1 commit intomodelcontextprotocol:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR fixes Streamable HTTP GET SSE reconnect behavior by evicting/replacing a stale standalone SSE stream mapping on reconnect, avoiding a race where async cleanup (invokeOnCompletion) leaves a stale STANDALONE_SSE_STREAM_ID entry that blocks a new GET SSE stream.
Changes:
- Replace the prior “reject second GET SSE with 409” logic with a close-and-replace strategy for the standalone GET SSE stream.
- Add new JVM unit tests covering reconnect and concurrent GET SSE replacement behavior.
- Add new CIO-based integration tests validating reconnect/replacement over real HTTP.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt | Updates GET SSE handling to evict/close an existing standalone stream mapping before establishing the new stream. |
| kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransportTest.kt | Adds unit tests for reconnect and concurrent replacement semantics for GET SSE streams. |
| integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/streamablehttp/StreamableHttpSseReconnectTest.kt | Adds integration tests against an embedded CIO server for reconnect and replacement scenarios. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| streamsMapping.remove(STANDALONE_SSE_STREAM_ID)?.let { existingStream -> | ||
| // Close the previous SSE stream. This handles both stale streams | ||
| // (client disconnected but cleanup hasn't fired) and live streams | ||
| // (client opened a new connection, replacing the old one). | ||
| try { | ||
| existingStream.session?.close() | ||
| } catch (_: Exception) { | ||
| // Ignore — the old stream may already be closed. | ||
| } | ||
| } |
| // (client disconnected but cleanup hasn't fired) and live streams | ||
| // (client opened a new connection, replacing the old one). | ||
| try { | ||
| existingStream.session?.close() |
| // Step 3: While the first stream is open, open a second GET. | ||
| // The new stream replaces the old one (last-writer-wins). | ||
| client.prepareGet(mcpPath) { | ||
| header(HttpHeaders.Accept, ContentType.Text.EventStream.toString()) | ||
| header(MCP_SESSION_ID_HEADER, sessionId) | ||
| header("mcp-protocol-version", LATEST_PROTOCOL_VERSION) | ||
| }.execute { secondResponse -> | ||
| secondResponse.status shouldBe HttpStatusCode.OK | ||
| secondResponse.headers[MCP_SESSION_ID_HEADER] shouldBe sessionId | ||
|
|
||
| val channel = secondResponse.bodyAsChannel() | ||
| val firstLine = channel.readUTF8Line() | ||
| firstLine.shouldNotBeNull() | ||
| channel.isClosedForRead shouldBe false | ||
| } |
| // Step 3: Immediately reconnect. The transport should detect that | ||
| // the previous stream's coroutine is cancelled and evict the stale | ||
| // mapping, allowing the new stream to succeed. |
devcrocod
left a comment
There was a problem hiding this comment.
Thank you for investigating the bug and for your contribution
In the PR, you point out a difference from ts sdk implementation, but this PR changes the behavior contract in case of a conflict.
I think it would be better to make cleanup synchronous instead. This way, we can preserve the existing contract while fixing the race condition between the old invokeOnCompletion and the new put. WDYT?
| if (STANDALONE_SSE_STREAM_ID in streamsMapping) { | ||
| call.reject( | ||
| HttpStatusCode.Conflict, | ||
| RPCError.ErrorCode.CONNECTION_CLOSED, | ||
| "Conflict: Only one SSE stream is allowed per session", | ||
| ) | ||
| return |
There was a problem hiding this comment.
By removing the reject with a 409 status here, you are changing the contract
There was a problem hiding this comment.
Thanks for taking a look @devcrocod
I agree with you, that was my first intuition but after diving in I learned:
- The actual / "on the wire" behavior of the Kotlin SDK today is that it doesn't actually return 409, the client gets a 200 (see additional context section + tests)
- I couldn't actually find a reference to this 409 requirement in the spec, but I did find it in the typescript implementation, so I ran a comparison on 3 more SDKs: Java, Go & C#, and I found that:
- Java: Replaces stream (no 409)
- Go: 409 Conflict
- C#: 400 Bad Request
| "Conflict: Only one SSE stream is allowed per session", | ||
| ) | ||
| return | ||
| streamsMapping.remove(STANDALONE_SSE_STREAM_ID)?.let { existingStream -> |
There was a problem hiding this comment.
Since there is no longer a return, execution reaches invokeOnCompletion, and as I understand it, this may introduce a race condition
When a GET SSE stream disconnects and the client reconnects, the
previous stream's STANDALONE_SSE_STREAM_ID entry in streamsMapping
may still be present because cleanup via invokeOnCompletion is
asynchronous. The client gets a 200 OK (headers already committed
by Ktor's sse {} handler) with an immediately-closed empty stream,
causing a retry loop.
Changes:
- Explicitly close the old SSE session before establishing a new
stream. If the old session is already dead this is a no-op.
- Replace invokeOnCompletion with try/finally around
awaitCancellation() for faster cleanup during cancellation.
- Add identity-based removal (=== check) in finally and
emitOnStream so old stream cleanup cannot evict a replacement's
mapping.
- Rethrow CancellationException instead of swallowing it.
Fixes modelcontextprotocol#715
548abcb to
1443117
Compare
Motivation and Context
When a GET SSE stream disconnects and the client reconnects on the same session, the new GET is silently rejected because the previous stream's
STANDALONE_SSE_STREAM_IDentry instreamsMappinghasn't been cleaned up yet. Cleanup viainvokeOnCompletionis asynchronous — there is a race window where the stale entry persists and blocks reconnection.Additionally, Ktor's
sse {}handler commits 200 OK headers beforehandleGetRequestruns, so the intendedcall.reject(HttpStatusCode.Conflict, ...)cannot change the already-committed HTTP status. On all tested engines (test host, CIO, Netty), clients receive a 200 OK with an immediately-closed empty SSE stream rather than a 409 Conflict. Server-sideCallLoggingreports 409 (from the response object, not the wire status), which is misleading.What this PR does
Explicitly closes the old SSE session before establishing a new stream. If the old session is already dead,
close()is a no-op. This is the synchronous cleanup that the asyncinvokeOnCompletioncouldn't provide.Replaces
invokeOnCompletionwithtry/finallyaroundawaitCancellation()— cleanup runs during cancellation propagation (earlier in the lifecycle) rather than after job completion. An identity check (=== newContext) ensures only this stream's entry is removed, not a replacement's.Adds identity-based removal in
emitOnStream— a failedsend()on an old stream can no longer accidentally evict the new stream's mapping.Rethrows
CancellationExceptioninstead of swallowing it, preserving structured concurrency.The TypeScript reference implementation avoids this race via synchronous
ReadableStream.cancel()cleanup. The Kotlin coroutine equivalent (invokeOnCompletion) is inherently async. The Go SDK usesdefer stream.release()under a mutex for the same reason. The Java SDK uses unconditionalAtomicReference.set()replacement withcompareAndExchangeon close.Fixes #715
How Has This Been Tested?
All tests run against latest
main(bf8dc6d). ktlint and detekt pass clean.Breaking Changes
The server no longer returns 409 Conflict for a second GET SSE stream on the same session. Instead, the old stream is explicitly closed and the new stream takes over. The 409 was already unreachable as an HTTP status — Ktor's
sse {}handler commits 200 OK before the transport handler runs on all tested engines (test host, CIO, Netty), so clients never received 409 on the wire.Types of changes
Checklist
Additional context
This bug likely became reachable after #681 (April 10) fixed the Netty
appendSseHeaderscrash that previously prevented GET SSE from working at all on Netty.Cross-SDK comparison of how each SDK handles a second GET SSE on the same session:
ReadableStream.cancel()defer stream.release()under mutexAtomicReference.set()+compareAndExchangeon closeinvokeOnCompletiontry/finally+ identity-based removal