Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions docs/develop/python/integrations/langgraph.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,133 @@ class LongRunningWorkflow:
workflow.continue_as_new(args=[state, cache()])
```

## Streaming

To stream intermediate values (such as LLM tokens or progress updates) out of a running
graph, set `streaming_topic` on `LangGraphPlugin`. Calls to LangGraph's
`get_stream_writer()` inside a node then publish to the named topic on the Workflow's
[`WorkflowStream`](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/workflow_streams),
and external subscribers consume the stream with `WorkflowStreamClient`.

Activity nodes publish via a batched Temporal signal, controlled by
`streaming_batch_interval` (default 100ms). Workflow nodes publish synchronously to the
in-Workflow stream, with no signal.

When `streaming_topic` is set, your Workflow **must** construct a `WorkflowStream()` in its
`@workflow.init` (its `__init__`); otherwise the plugin raises an error.

```python
from datetime import timedelta

from langgraph.config import get_stream_writer
from langgraph.graph import START, StateGraph
from typing_extensions import TypedDict

from temporalio import workflow
from temporalio.contrib.langgraph import LangGraphPlugin, graph
from temporalio.contrib.workflow_streams import WorkflowStream, WorkflowStreamClient


class State(TypedDict):
value: str


async def token_node(state: State) -> dict[str, str]:
writer = get_stream_writer()
for token in ["hello", " ", "world"]:
writer({"token": token})
writer({"done": True})
return {"value": "hello world"}


@workflow.defn
class StreamingWorkflow:
def __init__(self) -> None:
# Required when streaming_topic is set on the plugin.
_ = WorkflowStream()
self.app = graph("streaming").compile()

@workflow.run
async def run(self) -> str:
result = await self.app.ainvoke({"value": ""})
return result["value"]
```

Configure the plugin with `streaming_topic`:

```python
g = StateGraph(State)
g.add_node("token_node", token_node, metadata={"execute_in": "activity"})
g.add_edge(START, "token_node")

plugin = LangGraphPlugin(
graphs={"streaming": g},
default_activity_options={"start_to_close_timeout": timedelta(seconds=10)},
streaming_topic="tokens",
)
```

An external client subscribes to the topic to consume items as they're published:

```python
handle = await client.start_workflow(
StreamingWorkflow.run, id="streaming-wf", task_queue="streaming-tq"
)

ws_client = WorkflowStreamClient.create(client, handle.id)
async for item in ws_client.topic("tokens", type=dict).subscribe(from_offset=0):
print(item.data)
if item.data.get("done"):
break

print(await handle.result())
```

:::note

Streaming from a node running in the Workflow (`execute_in: "workflow"`) requires Python
3.11 or newer, because LangGraph relies on `contextvars` propagation through
`asyncio.create_task()`. See the [installation note](#install-the-plugin) above.

:::

### What's covered, and what isn't

`streaming_topic` wires up exactly one LangGraph stream mode: `stream_mode="custom"` — the
values written through `get_stream_writer()`. The other modes (`"messages"`, `"values"`,
`"updates"`, and `"debug"`) are **not** captured by `streaming_topic`, because they aren't
produced by node-side writers; LangGraph's orchestrator emits them as it walks the graph.

To stream one of those modes, bridge `astream()` in the Workflow and republish each yielded
chunk to a `WorkflowStream` topic yourself:

```python
@workflow.defn
class AstreamBridge:
def __init__(self) -> None:
self.stream = WorkflowStream()
self.app = graph("g").compile()

@workflow.run
async def run(self) -> None:
topic = self.stream.topic("astream")
async for chunk in self.app.astream({...}, stream_mode="messages"):
topic.publish(chunk)
topic.publish({"done": True})
```

### Retry semantics

Streaming has **at-least-once** delivery per Activity attempt. When an Activity-wrapped node
retries (transient failure, worker crash, and so on), the node function re-runs from scratch
and re-publishes its writes — earlier publishes from the failed attempt are not rolled back.
Subscribers should be ready to see duplicates and recover idempotently: dedupe on a sequence
ID you include in each chunk, or treat the stream as advisory and rely on the Workflow's final
result for state.

For a complete working example, see the
[streaming sample](https://github.com/temporalio/samples-python/tree/main/langgraph_plugin/graph_api/streaming).

## Tracing

For tracing your LangGraph Workflows and Activities, we recommend the
Expand Down