Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/mcp-send-outside-agent-context.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"agents": patch
---

Allow `McpAgent` server-to-client requests to send from callbacks that do not inherit the agent's async context, including callbacks reached through Worker Loader RPC.
54 changes: 50 additions & 4 deletions docs/get-current-agent.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# getCurrentAgent()
# `getCurrentAgent()`

## Automatic Context for Custom Methods
## Automatic context for custom methods

**All custom methods automatically have full agent context!** The framework automatically detects and wraps your custom methods during initialization, ensuring `getCurrentAgent()` works seamlessly everywhere.
The framework detects and wraps custom Agent methods during initialization so `getCurrentAgent()` can resolve the active agent inside them and the functions they call.

## How It Works

Expand Down Expand Up @@ -124,7 +124,53 @@ export class MyAgent extends AIChatAgent {
}
```

## API Reference
## When context is lost

The agent context only propagates along the call tree of the original
invocation. Code reached outside that call tree starts with an empty context,
so `getCurrentAgent()` returns an object whose fields are `undefined`. Common
cases include:

- a host callback invoked through RPC from a Worker Loader child isolate, such
as sandboxed Codemode execution;
- a service binding or Durable Object RPC entrypoint;
- a queue consumer or another entrypoint that retains an agent reference.

Route the callback through a public method on the agent. Custom methods are
wrapped automatically, so calling `agent.someMethod()` re-enters that agent's
context:

```typescript
import { RpcTarget } from "cloudflare:workers";

class HostCallbackBridge extends RpcTarget {
constructor(private agent: MyMcpAgent) {
super();
}

// Invoked through RPC from a Worker Loader child isolate. There is no context
// ancestry. Calling a public agent method restores it automatically.
async invoke() {
return this.agent.handleSandboxCallback();
}
}

export class MyMcpAgent extends McpAgent {
async handleSandboxCallback() {
const { agent } = getCurrentAgent<MyMcpAgent>();
// `agent` is available again.
}
}
```

Context restored this way has `connection`, `request`, and `email` unset. It
is not tied to live client I/O.

Server-initiated MCP requests (`elicitInput`, `createMessage`, and `listRoots`)
on `McpAgent` do not require this indirection because the MCP transport retains
its owning agent.

## API reference

The agents package exports one main function for context management:

Expand Down
30 changes: 23 additions & 7 deletions packages/agents/src/mcp/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ export class StreamableHTTPServerTransport implements Transport {
private _started = false;
private _eventStore?: EventStore | ClearableEventStore;

// The transport and agent share a Durable Object lifetime. Retaining the
// owner lets server-initiated sends work when the caller has no agent ALS
// ancestry, such as a host callback reached through Worker Loader RPC.
// This mirrors McpSSETransport, which also captures its owner when created.
// See cloudflare/agents#1490.
private readonly _agent: McpAgent;

// This tracks which messages on each POST stream have been answered.
// It is fine that we do not persist this since it only supports backwards
// compatibility for clients batching requests, which the spec discourages.
Expand Down Expand Up @@ -157,6 +164,7 @@ export class StreamableHTTPServerTransport implements Transport {

// Initialization is handled in `McpAgent.serve()` and agents are addressed by sessionId,
// so we'll always have this available.
this._agent = agent;
this.sessionId = agent.getSessionId();
this._eventStore = options.eventStore;
}
Expand Down Expand Up @@ -423,8 +431,7 @@ export class StreamableHTTPServerTransport implements Transport {

async close(): Promise<void> {
// Close all SSE connections
const { agent } = getCurrentAgent();
if (!agent) throw new Error("Agent was not found in close");
const agent = this._agent;

for (const conn of agent.getConnections()) {
conn.close(1000, "Session closed");
Expand Down Expand Up @@ -519,8 +526,10 @@ export class StreamableHTTPServerTransport implements Transport {
* there is at most one to send on.
*/
private async sendStandalone(message: JSONRPCMessage): Promise<void> {
const { agent } = getCurrentAgent<McpAgent>();
if (!agent) throw new Error("Agent was not found in send");
// Use the captured agent rather than getCurrentAgent(): server-initiated
// sends may originate from code with no agent context on the call stack
// (e.g. a callback reached via cross-isolate RPC). See #1490.
const agent = this._agent;

const eventId = await this._eventStore?.storeEvent(
STANDALONE_STREAM_ID,
Expand Down Expand Up @@ -549,9 +558,16 @@ export class StreamableHTTPServerTransport implements Transport {
message: JSONRPCMessage,
requestId: RequestId
): Promise<void> {
const { agent, connection: originatingConnection } =
getCurrentAgent<McpAgent>();
if (!agent) throw new Error("Agent was not found in send");
const agent = this._agent;
// A valid MCP session has one active stream for a request id, so normal
// routing uses the sole matching connection below. When a client reuses
// an active id across streams, request-scoped ALS can still identify the
// origin. Cross-isolate callbacks have no ALS store; in that case the
// ambiguous path below fails closed instead of guessing. Ignore context
// from a different agent so a foreign connection cannot affect routing.
const context = getCurrentAgent<McpAgent>();
const originatingConnection =
context.agent === agent ? context.connection : undefined;

// Pick the live connection that should receive this message. Normally
// request ids uniquely identify a POST connection. If a client violates
Expand Down
93 changes: 93 additions & 0 deletions packages/agents/src/tests/agents/mcp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
Agent,
callable,
getCurrentAgent,
__DO_NOT_USE_WILL_BREAK__agentContext as agentContext,
type AgentContext
} from "../../index.ts";
import {
Expand Down Expand Up @@ -225,6 +226,98 @@ export class TestMcpAgent extends McpAgent<Cloudflare.Env, unknown, Props> {
}
);

// The next two tools run their body inside `agentContext.exit(...)`,
// i.e. with an empty AsyncLocalStorage store. This simulates host-side
// callbacks reached outside the original invocation's call tree — e.g.
// via RPC from a Worker Loader child isolate or a service binding —
// where `getCurrentAgent()` returns undefined. Regression coverage for
// https://github.com/cloudflare/agents/issues/1490.
this.server.tool(
"elicitNameOutsideContext",
"Elicit user input from outside the agent ALS context (request-scoped send)",
{},
async (_args, extra) => {
const result = await agentContext.exit(() =>
this.server.server.elicitInput(
{
message: "What is your name?",
requestedSchema: {
type: "object",
properties: {
name: {
type: "string",
description: "Your name"
}
},
required: ["name"]
}
},
{ relatedRequestId: extra.requestId }
)
);

if (result.action === "accept" && result.content?.name) {
return {
content: [
{
type: "text",
text: `Outside-context elicit: ${result.content.name}`
}
]
};
}

return {
content: [{ type: "text", text: "Outside-context elicit cancelled" }]
};
}
);

this.server.tool(
"elicitNameOutsideContextStandalone",
"Elicit user input from outside the agent ALS context (standalone GET stream send)",
{},
async () => {
// No relatedRequestId: the elicit request goes out on the
// standalone GET stream via the transport's sendStandalone path.
const result = await agentContext.exit(() =>
this.server.server.elicitInput({
message: "What is your name?",
requestedSchema: {
type: "object",
properties: {
name: {
type: "string",
description: "Your name"
}
},
required: ["name"]
}
})
);

if (result.action === "accept" && result.content?.name) {
return {
content: [
{
type: "text",
text: `Standalone outside-context elicit: ${result.content.name}`
}
]
};
}

return {
content: [
{
type: "text",
text: "Standalone outside-context elicit cancelled"
}
]
};
}
);

// Use `registerTool` so we can later remove it.
// Triggers notifications/tools/list_changed
this.server.registerTool(
Expand Down
151 changes: 151 additions & 0 deletions packages/agents/src/tests/mcp/outside-context-send.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import { createExecutionContext } from "cloudflare:test";
import type {
CallToolResult,
JSONRPCMessage,
JSONRPCRequest,
JSONRPCResultResponse
} from "@modelcontextprotocol/sdk/types.js";
import { describe, expect, it } from "vitest";
import {
initializeStreamableHTTPServer,
openStandaloneSSE,
parseSSEData,
sendPostRequest
} from "../shared/test-utils";

async function readOneFrame(
reader: ReadableStreamDefaultReader<Uint8Array>
): Promise<string> {
const { value } = await reader.read();
if (!value) throw new Error("SSE stream ended before a frame arrived");
return new TextDecoder().decode(value);
}

/**
* Regression tests for https://github.com/cloudflare/agents/issues/1490.
*
* Server-initiated MCP requests (elicitInput, createMessage, listRoots)
* used to throw "Agent was not found in send" when issued from code with
* no agent AsyncLocalStorage context on its call stack, such as a host-side
* callback invoked via RPC from a Worker Loader child isolate. The test
* tools simulate that by running `this.server.server.elicitInput(...)`
* inside `agentContext.exit(...)`, which strips the store exactly like a
* fresh entrypoint invocation does.
*/
describe("server-initiated sends outside the agent ALS context", () => {
const baseUrl = "http://example.com/mcp";

it("routes a request-scoped elicit (relatedRequestId) issued outside the context", async () => {
const ctx = createExecutionContext();
const sessionId = await initializeStreamableHTTPServer(ctx);

const toolCallMsg: JSONRPCMessage = {
id: "outside-ctx-1",
jsonrpc: "2.0",
method: "tools/call",
params: { name: "elicitNameOutsideContext", arguments: {} }
};

const toolResponse = await sendPostRequest(
ctx,
baseUrl,
toolCallMsg,
sessionId
);
expect(toolResponse.status).toBe(200);

const reader = toolResponse.body?.getReader();
if (!reader) throw new Error("No reader available for POST stream");

// Before the fix this frame never arrived: the transport threw
// "Agent was not found in send" and the tool call errored out.
const elicitFrame = await readOneFrame(reader);
const elicitRequest = parseSSEData(elicitFrame) as JSONRPCRequest;
expect(elicitRequest.method).toBe("elicitation/create");

const elicitResponse: JSONRPCMessage = {
jsonrpc: "2.0",
id: elicitRequest.id,
result: {
action: "accept",
content: { name: "Alice" }
}
} as unknown as JSONRPCMessage;

const responsePost = await sendPostRequest(
ctx,
baseUrl,
elicitResponse,
sessionId
);
expect(responsePost.status).toBe(202);

const toolResultFrame = await readOneFrame(reader);
const toolResult = parseSSEData(toolResultFrame) as JSONRPCResultResponse;

expect(toolResult.id).toBe("outside-ctx-1");
const result = toolResult.result as CallToolResult;
expect(result.content).toEqual([
{ type: "text", text: "Outside-context elicit: Alice" }
]);
});

it("delivers a standalone elicit (no relatedRequestId) issued outside the context on the GET stream", async () => {
const ctx = createExecutionContext();
const sessionId = await initializeStreamableHTTPServer(ctx);

// Server-initiated requests without relatedRequestId go out on the
// standalone GET stream (transport sendStandalone path).
const standaloneReader = await openStandaloneSSE(ctx, sessionId, baseUrl);

const toolCallMsg: JSONRPCMessage = {
id: "outside-ctx-standalone-1",
jsonrpc: "2.0",
method: "tools/call",
params: { name: "elicitNameOutsideContextStandalone", arguments: {} }
};

const toolResponse = await sendPostRequest(
ctx,
baseUrl,
toolCallMsg,
sessionId
);
expect(toolResponse.status).toBe(200);

const postReader = toolResponse.body?.getReader();
if (!postReader) throw new Error("No reader available for POST stream");

const elicitFrame = await readOneFrame(standaloneReader);
const elicitRequest = parseSSEData(elicitFrame) as JSONRPCRequest;
expect(elicitRequest.method).toBe("elicitation/create");

const elicitResponse: JSONRPCMessage = {
jsonrpc: "2.0",
id: elicitRequest.id,
result: {
action: "accept",
content: { name: "Bob" }
}
} as unknown as JSONRPCMessage;

const responsePost = await sendPostRequest(
ctx,
baseUrl,
elicitResponse,
sessionId
);
expect(responsePost.status).toBe(202);

const toolResultFrame = await readOneFrame(postReader);
const toolResult = parseSSEData(toolResultFrame) as JSONRPCResultResponse;

expect(toolResult.id).toBe("outside-ctx-standalone-1");
const result = toolResult.result as CallToolResult;
expect(result.content).toEqual([
{ type: "text", text: "Standalone outside-context elicit: Bob" }
]);

await standaloneReader.cancel();
});
});
Loading