Skip to content

fix(server): evict stale GET SSE stream mapping on reconnect#716

Open
amr wants to merge 1 commit intomodelcontextprotocol:mainfrom
amr:fix/stale-sse-stream-mapping
Open

fix(server): evict stale GET SSE stream mapping on reconnect#716
amr wants to merge 1 commit intomodelcontextprotocol:mainfrom
amr:fix/stale-sse-stream-mapping

Conversation

@amr
Copy link
Copy Markdown

@amr amr commented Apr 24, 2026

Motivation and Context

When a GET SSE stream disconnects and the client reconnects on the same session, the new GET is silently rejected because the previous stream's STANDALONE_SSE_STREAM_ID entry in streamsMapping hasn't been cleaned up yet. Cleanup via invokeOnCompletion is asynchronous — there is a race window where the stale entry persists and blocks reconnection.

Additionally, Ktor's sse {} handler commits 200 OK headers before handleGetRequest runs, so the intended call.reject(HttpStatusCode.Conflict, ...) cannot change the already-committed HTTP status. On all tested engines (test host, CIO, Netty), clients receive a 200 OK with an immediately-closed empty SSE stream rather than a 409 Conflict. Server-side CallLogging reports 409 (from the response object, not the wire status), which is misleading.

What this PR does

  1. Explicitly closes the old SSE session before establishing a new stream. If the old session is already dead, close() is a no-op. This is the synchronous cleanup that the async invokeOnCompletion couldn't provide.

  2. Replaces invokeOnCompletion with try/finally around awaitCancellation() — cleanup runs during cancellation propagation (earlier in the lifecycle) rather than after job completion. An identity check (=== newContext) ensures only this stream's entry is removed, not a replacement's.

  3. Adds identity-based removal in emitOnStream — a failed send() on an old stream can no longer accidentally evict the new stream's mapping.

  4. Rethrows CancellationException instead of swallowing it, preserving structured concurrency.

The TypeScript reference implementation avoids this race via synchronous ReadableStream.cancel() cleanup. The Kotlin coroutine equivalent (invokeOnCompletion) is inherently async. The Go SDK uses defer stream.release() under a mutex for the same reason. The Java SDK uses unconditional AtomicReference.set() replacement with compareAndExchange on close.

Fixes #715

How Has This Been Tested?

All tests run against latest main (bf8dc6d). ktlint and detekt pass clean.

  • 2 new unit tests (Ktor test host): reconnect after disconnect, concurrent stream takeover
  • 2 new integration tests (real CIO embedded server): same scenarios over real HTTP
  • Manual verification against a live Ktor+Netty MCP server (curl): confirmed 200 OK on the wire with immediately-closed stream on reconnect (pre-fix), live stream on reconnect (post-fix)
  • All existing server unit tests pass (76 total)
  • All existing integration tests pass (306 total)

Breaking Changes

The server no longer returns 409 Conflict for a second GET SSE stream on the same session. Instead, the old stream is explicitly closed and the new stream takes over. The 409 was already unreachable as an HTTP status — Ktor's sse {} handler commits 200 OK before the transport handler runs on all tested engines (test host, CIO, Netty), so clients never received 409 on the wire.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Documentation update

Checklist

  • I have read the MCP Documentation
  • My code follows the repository's style guidelines
  • New and existing tests pass locally
  • I have added appropriate error handling
  • I have added or updated documentation as needed

Additional context

This bug likely became reachable after #681 (April 10) fixed the Netty appendSseHeaders crash that previously prevented GET SSE from working at all on Netty.

Cross-SDK comparison of how each SDK handles a second GET SSE on the same session:

SDK Behavior Cleanup mechanism Stale mapping bug?
TypeScript (reference) 409 Conflict Synchronous ReadableStream.cancel() No
Go 409 Conflict defer stream.release() under mutex No
Java Replaces (no 409) AtomicReference.set() + compareAndExchange on close No
C# 400 Bad Request Boolean flag, never reset (no reconnect support) No
Kotlin (before fix) 409 (unreachable on wire) Async invokeOnCompletion Yes
Kotlin (this PR) Closes old + replaces try/finally + identity-based removal No

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes Streamable HTTP GET SSE reconnect behavior by evicting/replacing a stale standalone SSE stream mapping on reconnect, avoiding a race where async cleanup (invokeOnCompletion) leaves a stale STANDALONE_SSE_STREAM_ID entry that blocks a new GET SSE stream.

Changes:

  • Replace the prior “reject second GET SSE with 409” logic with a close-and-replace strategy for the standalone GET SSE stream.
  • Add new JVM unit tests covering reconnect and concurrent GET SSE replacement behavior.
  • Add new CIO-based integration tests validating reconnect/replacement over real HTTP.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.

File Description
kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt Updates GET SSE handling to evict/close an existing standalone stream mapping before establishing the new stream.
kotlin-sdk-server/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransportTest.kt Adds unit tests for reconnect and concurrent replacement semantics for GET SSE streams.
integration-test/src/jvmTest/kotlin/io/modelcontextprotocol/kotlin/sdk/integration/streamablehttp/StreamableHttpSseReconnectTest.kt Adds integration tests against an embedded CIO server for reconnect and replacement scenarios.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 467 to 476
streamsMapping.remove(STANDALONE_SSE_STREAM_ID)?.let { existingStream ->
// Close the previous SSE stream. This handles both stale streams
// (client disconnected but cleanup hasn't fired) and live streams
// (client opened a new connection, replacing the old one).
try {
existingStream.session?.close()
} catch (_: Exception) {
// Ignore — the old stream may already be closed.
}
}
// (client disconnected but cleanup hasn't fired) and live streams
// (client opened a new connection, replacing the old one).
try {
existingStream.session?.close()
Comment on lines +495 to +509
// Step 3: While the first stream is open, open a second GET.
// The new stream replaces the old one (last-writer-wins).
client.prepareGet(mcpPath) {
header(HttpHeaders.Accept, ContentType.Text.EventStream.toString())
header(MCP_SESSION_ID_HEADER, sessionId)
header("mcp-protocol-version", LATEST_PROTOCOL_VERSION)
}.execute { secondResponse ->
secondResponse.status shouldBe HttpStatusCode.OK
secondResponse.headers[MCP_SESSION_ID_HEADER] shouldBe sessionId

val channel = secondResponse.bodyAsChannel()
val firstLine = channel.readUTF8Line()
firstLine.shouldNotBeNull()
channel.isClosedForRead shouldBe false
}
Comment on lines +83 to +85
// Step 3: Immediately reconnect. The transport should detect that
// the previous stream's coroutine is cancelled and evict the stale
// mapping, allowing the new stream to succeed.
Copy link
Copy Markdown
Contributor

@devcrocod devcrocod left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for investigating the bug and for your contribution

In the PR, you point out a difference from ts sdk implementation, but this PR changes the behavior contract in case of a conflict.
I think it would be better to make cleanup synchronous instead. This way, we can preserve the existing contract while fixing the race condition between the old invokeOnCompletion and the new put. WDYT?

Comment on lines -467 to -473
if (STANDALONE_SSE_STREAM_ID in streamsMapping) {
call.reject(
HttpStatusCode.Conflict,
RPCError.ErrorCode.CONNECTION_CLOSED,
"Conflict: Only one SSE stream is allowed per session",
)
return
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By removing the reject with a 409 status here, you are changing the contract

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking a look @devcrocod

I agree with you, that was my first intuition but after diving in I learned:

  1. The actual / "on the wire" behavior of the Kotlin SDK today is that it doesn't actually return 409, the client gets a 200 (see additional context section + tests)
  2. I couldn't actually find a reference to this 409 requirement in the spec, but I did find it in the typescript implementation, so I ran a comparison on 3 more SDKs: Java, Go & C#, and I found that:
    • Java: Replaces stream (no 409)
    • Go: 409 Conflict
    • C#: 400 Bad Request

"Conflict: Only one SSE stream is allowed per session",
)
return
streamsMapping.remove(STANDALONE_SSE_STREAM_ID)?.let { existingStream ->
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there is no longer a return, execution reaches invokeOnCompletion, and as I understand it, this may introduce a race condition

When a GET SSE stream disconnects and the client reconnects, the
previous stream's STANDALONE_SSE_STREAM_ID entry in streamsMapping
may still be present because cleanup via invokeOnCompletion is
asynchronous. The client gets a 200 OK (headers already committed
by Ktor's sse {} handler) with an immediately-closed empty stream,
causing a retry loop.

Changes:
- Explicitly close the old SSE session before establishing a new
  stream. If the old session is already dead this is a no-op.
- Replace invokeOnCompletion with try/finally around
  awaitCancellation() for faster cleanup during cancellation.
- Add identity-based removal (=== check) in finally and
  emitOnStream so old stream cleanup cannot evict a replacement's
  mapping.
- Rethrow CancellationException instead of swallowing it.

Fixes modelcontextprotocol#715
@amr amr force-pushed the fix/stale-sse-stream-mapping branch from 548abcb to 1443117 Compare April 24, 2026 18:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

GET SSE stream reconnect fails due to stale STANDALONE_SSE_STREAM_ID mapping

3 participants