Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 50 additions & 8 deletions src/llm-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
//
// Used by the synthesizer so tokens land on stdout as the model writes them
// instead of making the user stare at a blank terminal for 30+ seconds on a
// deep query. Retry applies to the initial connect only — mid-stream
// failures propagate because we've already emitted bytes to the caller.
// deep query. Retry wraps the initial connect only; once the stream flows a
// failure surfaces to the caller. The stream is bounded by an idle-token
// deadline so a stalled response fails fast instead of hanging.

import { trimTrailingSlashes } from "./url-util.js";
import { retry } from "./retry.js";
Expand Down Expand Up @@ -59,9 +60,10 @@ export async function callLLMStream(
const timeoutMs = config.timeoutMs ?? DEFAULT_LLM_TIMEOUT_MS;
const attempts = Math.max(1, config.maxAttempts ?? DEFAULT_LLM_ATTEMPTS);

// Retry wraps the initial connect only. Once we start emitting tokens
// through onToken, a mid-stream failure can't be undone, so we let it
// surface to the caller instead of silently retrying.
// Retry wraps the initial connect only. Once the stream is flowing we don't
// retry: a mid-stream failure on this synthesis is (empirically, #104) a
// persistent upstream stall, so re-issuing the request just burns the same
// wall-clock again; the idle-token deadline below fails it fast instead.
const res = await retry(
async () => {
const combined = makeTimeoutSignal(timeoutMs, signal);
Expand Down Expand Up @@ -102,7 +104,11 @@ export async function callLLMStream(
let text = "";
let usage: LLMResult["usage"];

for await (const raw of parseSSE(res.body, signal)) {
// Bound the stream by an idle-token deadline (#104). A healthy long
// generation never idles between tokens, so it streams to completion even
// past `timeoutMs`; a genuine stall (no token for `timeoutMs`) aborts here
// instead of hanging until the global --max-runtime (or forever, if unset).
for await (const raw of parseSSE(res.body, signal, timeoutMs)) {
const event =
format === "openai" ? openaiSSEToAnthropic(raw) ?? raw : raw;
const type = event.type;
Expand Down Expand Up @@ -135,18 +141,21 @@ interface SSEEvent {
[key: string]: unknown;
}

// Exported for unit tests.
// Exported for unit tests. `idleMs`, when set, bounds the gap between chunks:
// if no data arrives for that long the underlying stream is cancelled and the
// generator throws a TimeoutError, so a stalled response can't hang forever.
export async function* parseSSE(
stream: ReadableStream<Uint8Array>,
signal?: AbortSignal,
idleMs?: number,
): AsyncGenerator<SSEEvent> {
const reader = stream.getReader();
const decoder = new TextDecoder();
let buffer = "";
try {
while (true) {
if (signal?.aborted) throw new Error("aborted");
const { done, value } = await reader.read();
const { done, value } = await readChunk(reader, idleMs);
if (done) {
// Flush any trailing event without blank-line terminator.
if (buffer.trim().length > 0) {
Expand All @@ -171,6 +180,39 @@ export async function* parseSSE(
}
}

// Race a single read against an idle deadline. On timeout, cancel the stream
// so the pending read settles (a locked reader with a pending read can't
// releaseLock cleanly) and surface a TimeoutError to the generator.
async function readChunk(
reader: ReadableStreamDefaultReader<Uint8Array>,
idleMs?: number,
): Promise<ReadableStreamReadResult<Uint8Array>> {
if (!idleMs) return reader.read();
const read = reader.read();
let timer: ReturnType<typeof setTimeout> | undefined;
const idle = new Promise<never>((_, reject) => {
// NOT unref'd: while a stream is in flight this timer represents real
// pending work (we want the loop kept alive to await the next token). It
// is always cleared on a chunk or fires within idleMs, so it never holds
// the process open beyond the read it guards.
timer = setTimeout(
() => reject(new DOMException(`stream idle for ${idleMs}ms`, "TimeoutError")),
idleMs,
);
});
try {
return await Promise.race([read, idle]);
} catch (err) {
// Cancel so the still-pending read settles — a never-closing stream would
// otherwise leave a dangling promise — then surface the timeout.
await reader.cancel(err).catch(() => undefined);
await read.catch(() => undefined);
throw err;
} finally {
clearTimeout(timer);
}
}

function indexOfBlankLine(s: string, from: number): number {
const a = s.indexOf("\n\n", from);
const b = s.indexOf("\r\n\r\n", from);
Expand Down
22 changes: 13 additions & 9 deletions src/synthesize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
// sources, asks the LLM to produce a cited markdown answer. Sources are
// passed as a numbered list so the model can cite them inline as [1], [2].
//
// When `onToken` is provided, uses the streaming variant so tokens land in
// front of the user as the model writes them instead of after a 30+s wait.
// Synthesis always streams (see the call site). When `onToken` is provided
// (interactive TTY), tokens land in front of the user as the model writes
// them; otherwise the stream is accumulated and returned in one shot.

import { callLLM, type LLMConfig, type LLMResult } from "./llm.js";
import { type LLMConfig } from "./llm.js";
import { callLLMStream } from "./llm-stream.js";
import type { Source } from "./citations.js";
import type { UsageSink } from "./plan.js";
Expand Down Expand Up @@ -59,12 +60,15 @@ export async function synthesize(
`Sources (${sources.length}):\n\n${packet}\n\n` +
`Write the cited markdown answer now.`;
const messages = [{ role: "user" as const, content: userMessage }];
let result: LLMResult;
if (onToken) {
result = await callLLMStream(messages, system, config, { onToken }, signal);
} else {
result = await callLLM(messages, system, config, signal);
}
// Always stream the synthesis (#104). A large, table-heavy answer can take
// 100-150s to generate; the non-streaming client's whole-call timeout
// (DEFAULT_LLM_TIMEOUT_MS, 120s) intermittently fired mid-generation and
// burned three full retries (~360s) before failing. The streaming client
// bounds only the connect by that timeout and the generation by an
// idle-token deadline, so a long-but-healthy stream finishes in one pass
// while a genuine stall still fails fast. `onToken` is undefined in
// non-TTY / --json mode — callLLMStream then just accumulates and returns.
const result = await callLLMStream(messages, system, config, { onToken }, signal);
if (result.usage && onUsage) onUsage(result.usage);
return result.text;
}
Expand Down
20 changes: 20 additions & 0 deletions test/agent-loop.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,26 @@ function makeLLMServer(responseQueue, usageQueue) {
calls.push({ system: parsed.system, messages: parsed.messages });
const text = responseQueue.shift() ?? "(no more canned responses)";
const usage = usageQueue?.shift() ?? { input_tokens: 10, output_tokens: 10 };
if (parsed.stream) {
// Synthesis streams (callLLMStream). Emit a minimal Anthropic SSE
// sequence: input usage in message_start, the text as one text_delta,
// output usage in message_delta.
res.writeHead(200, { "content-type": "text/event-stream" });
const frame = (obj) => `data: ${JSON.stringify(obj)}\n\n`;
res.write(
frame({
type: "message_start",
message: { usage: { input_tokens: usage.input_tokens, output_tokens: 0 } },
}),
);
res.write(
frame({ type: "content_block_delta", delta: { type: "text_delta", text } }),
);
res.write(frame({ type: "message_delta", usage: { output_tokens: usage.output_tokens } }));
res.write(frame({ type: "message_stop" }));
res.end();
return;
}
const payload = {
id: "msg_test",
type: "message",
Expand Down
71 changes: 71 additions & 0 deletions test/llm-stream.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,45 @@ test("parseSSE: accepts CRLF line endings", async () => {
assert.equal(events[0].delta.text, "x");
});

// A stream that delivers `parts` then hangs (never closes) — models a server
// that stops sending mid-response.
function stallingStreamOf(parts) {
const encoder = new TextEncoder();
const queue = parts.map((p) => encoder.encode(p));
return new ReadableStream({
start(controller) {
for (const chunk of queue) controller.enqueue(chunk);
// deliberately no controller.close(): the next read() never settles.
},
});
}

test("parseSSE: idle timeout aborts a stalled stream (#104)", async () => {
// First frame arrives, then the stream stalls — the idle deadline must fire.
const stream = stallingStreamOf(['data: {"type":"message_start"}\n\n']);
await assert.rejects(
(async () => {
for await (const _e of parseSSE(stream, undefined, 30)) {
/* drain; the stall happens on the read after the first frame */
}
})(),
(err) => err?.name === "TimeoutError",
);
});

test("parseSSE: idleMs set but a prompt stream completes without false timeout", async () => {
const frames = [
'data: {"type":"message_start"}\n\n',
'data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"ok"}}\n\n',
'data: {"type":"message_stop"}\n\n',
];
const events = [];
// Generous idle bound; streamOf closes promptly, so no timeout should fire.
for await (const e of parseSSE(streamOf(frames), undefined, 1000)) events.push(e);
assert.equal(events.length, 3);
assert.equal(events[1].delta.text, "ok");
});

// ──────── callLLMStream: integration ───────────────────────────────────────

function makeSSEResponder(frames) {
Expand Down Expand Up @@ -224,3 +263,35 @@ test("callLLMStream: non-text_delta events are ignored gracefully", async () =>
await stop(server);
}
});

test("callLLMStream: a mid-stream stall surfaces (no mid-stream retry)", async () => {
// The stream opens, sends one token, then stalls. callLLMStream retries the
// connect only — so the idle-token deadline must surface a TimeoutError
// rather than silently re-issuing the request.
let calls = 0;
const tokens = [];
const server = http.createServer((_req, res) => {
calls++;
res.writeHead(200, { "content-type": "text/event-stream" });
res.write(
'data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"tok"}}\n\n',
);
// hang after the first token → stream stalls.
});
const baseUrl = await start(server);
try {
await assert.rejects(
callLLMStream(
[{ role: "user", content: "hi" }],
"sys",
{ baseUrl, apiKey: "t", model: "test", maxTokens: 10, timeoutMs: 150, maxAttempts: 3 },
{ onToken: (t) => tokens.push(t) },
),
(err) => err?.name === "TimeoutError",
);
assert.deepEqual(tokens, ["tok"]);
assert.equal(calls, 1); // connected once, no mid-stream retry
} finally {
await stop(server);
}
});