diff --git a/docs/develop/python/integrations/langgraph.mdx b/docs/develop/python/integrations/langgraph.mdx index e0cf84e911..471256937b 100644 --- a/docs/develop/python/integrations/langgraph.mdx +++ b/docs/develop/python/integrations/langgraph.mdx @@ -331,6 +331,133 @@ class LongRunningWorkflow: workflow.continue_as_new(args=[state, cache()]) ``` +## Streaming + +To stream intermediate values (such as LLM tokens or progress updates) out of a running +graph, set `streaming_topic` on `LangGraphPlugin`. Calls to LangGraph's +`get_stream_writer()` inside a node then publish to the named topic on the Workflow's +[`WorkflowStream`](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/workflow_streams), +and external subscribers consume the stream with `WorkflowStreamClient`. + +Activity nodes publish via a batched Temporal signal, controlled by +`streaming_batch_interval` (default 100ms). Workflow nodes publish synchronously to the +in-Workflow stream, with no signal. + +When `streaming_topic` is set, your Workflow **must** construct a `WorkflowStream()` in its +`@workflow.init` (its `__init__`); otherwise the plugin raises an error. + +```python +from datetime import timedelta + +from langgraph.config import get_stream_writer +from langgraph.graph import START, StateGraph +from typing_extensions import TypedDict + +from temporalio import workflow +from temporalio.contrib.langgraph import LangGraphPlugin, graph +from temporalio.contrib.workflow_streams import WorkflowStream, WorkflowStreamClient + + +class State(TypedDict): + value: str + + +async def token_node(state: State) -> dict[str, str]: + writer = get_stream_writer() + for token in ["hello", " ", "world"]: + writer({"token": token}) + writer({"done": True}) + return {"value": "hello world"} + + +@workflow.defn +class StreamingWorkflow: + def __init__(self) -> None: + # Required when streaming_topic is set on the plugin. + _ = WorkflowStream() + self.app = graph("streaming").compile() + + @workflow.run + async def run(self) -> str: + result = await self.app.ainvoke({"value": ""}) + return result["value"] +``` + +Configure the plugin with `streaming_topic`: + +```python +g = StateGraph(State) +g.add_node("token_node", token_node, metadata={"execute_in": "activity"}) +g.add_edge(START, "token_node") + +plugin = LangGraphPlugin( + graphs={"streaming": g}, + default_activity_options={"start_to_close_timeout": timedelta(seconds=10)}, + streaming_topic="tokens", +) +``` + +An external client subscribes to the topic to consume items as they're published: + +```python +handle = await client.start_workflow( + StreamingWorkflow.run, id="streaming-wf", task_queue="streaming-tq" +) + +ws_client = WorkflowStreamClient.create(client, handle.id) +async for item in ws_client.topic("tokens", type=dict).subscribe(from_offset=0): + print(item.data) + if item.data.get("done"): + break + +print(await handle.result()) +``` + +:::note + +Streaming from a node running in the Workflow (`execute_in: "workflow"`) requires Python +3.11 or newer, because LangGraph relies on `contextvars` propagation through +`asyncio.create_task()`. See the [installation note](#install-the-plugin) above. + +::: + +### What's covered, and what isn't + +`streaming_topic` wires up exactly one LangGraph stream mode: `stream_mode="custom"` — the +values written through `get_stream_writer()`. The other modes (`"messages"`, `"values"`, +`"updates"`, and `"debug"`) are **not** captured by `streaming_topic`, because they aren't +produced by node-side writers; LangGraph's orchestrator emits them as it walks the graph. + +To stream one of those modes, bridge `astream()` in the Workflow and republish each yielded +chunk to a `WorkflowStream` topic yourself: + +```python +@workflow.defn +class AstreamBridge: + def __init__(self) -> None: + self.stream = WorkflowStream() + self.app = graph("g").compile() + + @workflow.run + async def run(self) -> None: + topic = self.stream.topic("astream") + async for chunk in self.app.astream({...}, stream_mode="messages"): + topic.publish(chunk) + topic.publish({"done": True}) +``` + +### Retry semantics + +Streaming has **at-least-once** delivery per Activity attempt. When an Activity-wrapped node +retries (transient failure, worker crash, and so on), the node function re-runs from scratch +and re-publishes its writes — earlier publishes from the failed attempt are not rolled back. +Subscribers should be ready to see duplicates and recover idempotently: dedupe on a sequence +ID you include in each chunk, or treat the stream as advisory and rely on the Workflow's final +result for state. + +For a complete working example, see the +[streaming sample](https://github.com/temporalio/samples-python/tree/main/langgraph_plugin/graph_api/streaming). + ## Tracing For tracing your LangGraph Workflows and Activities, we recommend the