Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 50 additions & 38 deletions src/mcp/client/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,48 +257,60 @@ async def _handle_post_request(self, ctx: RequestContext) -> None:
message = ctx.session_message.message
is_initialization = self._is_initialization_request(message)

async with ctx.client.stream(
"POST",
self.url,
json=message.model_dump(by_alias=True, mode="json", exclude_unset=True),
headers=headers,
) as response:
if response.status_code == 202:
logger.debug("Received 202 Accepted")
return

if response.status_code == 404: # pragma: no branch
if isinstance(message, JSONRPCRequest): # pragma: no branch
error_data = ErrorData(code=INVALID_REQUEST, message="Session terminated")
session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
await ctx.read_stream_writer.send(session_message)
return
try:
async with ctx.client.stream(
"POST",
self.url,
json=message.model_dump(by_alias=True, mode="json", exclude_unset=True),
headers=headers,
) as response:
if response.status_code == 202:
logger.debug("Received 202 Accepted")
return

if response.status_code >= 400:
if isinstance(message, JSONRPCRequest):
error_data = ErrorData(code=INTERNAL_ERROR, message="Server returned an error response")
session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
await ctx.read_stream_writer.send(session_message)
return
if response.status_code >= 400:
# Read body for error detail
await response.aread()
body_text = response.text[:200] if response.text else ""
error_msg = (
f"HTTP {response.status_code}: {body_text}" if body_text else f"HTTP {response.status_code}"
)
if isinstance(message, JSONRPCRequest):
error_data = ErrorData(
code=INTERNAL_ERROR,
message=error_msg,
)
session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
await ctx.read_stream_writer.send(session_message)
return

if is_initialization:
self._maybe_extract_session_id_from_response(response)
if is_initialization:
self._maybe_extract_session_id_from_response(response)

# Per https://modelcontextprotocol.io/specification/2025-06-18/basic#notifications:
# The server MUST NOT send a response to notifications.
# Per https://modelcontextprotocol.io/specification/2025-06-18/basic#notifications:
# The server MUST NOT send a response to notifications.
if isinstance(message, JSONRPCRequest):
content_type = response.headers.get("content-type", "").lower()
if content_type.startswith("application/json"):
await self._handle_json_response(
response, ctx.read_stream_writer, is_initialization, request_id=message.id
)
elif content_type.startswith("text/event-stream"):
await self._handle_sse_response(response, ctx, is_initialization)
else:
logger.error(f"Unexpected content type: {content_type}")
error_data = ErrorData(code=INVALID_REQUEST, message=f"Unexpected content type: {content_type}")
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
await ctx.read_stream_writer.send(error_msg)
except Exception as exc:
# Propagate connection/transport errors to the caller via the read stream
# so they don't hang waiting for a response that will never arrive.
if isinstance(message, JSONRPCRequest):
content_type = response.headers.get("content-type", "").lower()
if content_type.startswith("application/json"):
await self._handle_json_response(
response, ctx.read_stream_writer, is_initialization, request_id=message.id
)
elif content_type.startswith("text/event-stream"):
await self._handle_sse_response(response, ctx, is_initialization)
else:
logger.error(f"Unexpected content type: {content_type}")
error_data = ErrorData(code=INVALID_REQUEST, message=f"Unexpected content type: {content_type}")
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
await ctx.read_stream_writer.send(error_msg)
error_data = ErrorData(code=INTERNAL_ERROR, message=str(exc))
session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
with contextlib.suppress(Exception):
await ctx.read_stream_writer.send(session_message)
raise

async def _handle_json_response(
self,
Expand Down
Loading