Skip to content

Commit 5ceb3c8

Browse files
brianstrauchclaudelennessyy
authored
Add Strands plugin samples (#310)
* Add Strands plugin samples Demonstrates the Temporal Strands plugin: hello world, tools (in-workflow, custom activity, strands_tools), HITL, hooks, MCP, structured output, streaming, interrupt, and continue-as-new. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Convert continue_as_new chat to workflow update `user_says` → `turn`, now a `@workflow.update` returning the assistant's reply directly so callers no longer need a separate `messages` query to discover what the agent said. The run loop drains in-flight handlers via `workflow.all_handlers_finished` before continue-as-new. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Fix interrupt sample: make delete_thing idempotent The activity was toggling its `_APPROVED` state (add on raise, discard on success), so after the human approved once, a follow-up tool call from the model would interrupt again with no further approval coming — hanging the test. Drop the discard so once approved, the name stays approved. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Fix ruff import order in strands mock model ruff's `I001` rule groups `temporalio` with other third-party imports rather than its own block, which broke `poe lint` on CI. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Rename Strands sample extra * Use Windows-compatible Strands tool sample * Bump sdk-python pin to pick up Strands MCP client fix Picks up c84320c6 on the strands branch, which rewrites the MCP populate_cache and call_tool activity to use ClientSession directly instead of MCPClient.start/stop. The old code created a background event loop on one thread and closed it from another, which deadlocked mcp_test on Python 3.10 once trio_asyncio's policy was installed at import time by tests/trio_async/workflow_test.py. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Rename Strands interrupt sample to activity_interrupt Disambiguates from the hook-based human_in_the_loop sample by naming this one after where the interrupt is raised (a Temporal activity). Also fixes the parent README's broken `tool_interrupt` link. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Fix uv sync group name in Strands READMEs The dependency group is named strands-agents, not strands. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * Test the continue-as-new path in the Strands chat sample Replace test_continue_as_new_chat, which never tripped the continue-as-new branch, with test_continue_as_new_carries_history. A low limit.historyCount.suggestContinueAsNew threshold makes the server suggest continue-as-new after one turn, so the test verifies the original run ends in CONTINUED_AS_NEW and the fresh run resumes with the carried-over messages before taking another turn and ending the chat. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * add snipsync lines * Remove unnecessary langsmith constraint temporalio 1.28.0's langsmith extra already pins langsmith>=0.7.34,<0.9, so the manual constraint-dependencies entry is redundant. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * Fix formatting in strands_plugin workflows ruff format requires two blank lines before the trailing @@@SNIPEND markers added in 12f9d5d. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Lenny Chen <55669665+lennessyy@users.noreply.github.com> Co-authored-by: Lenny Chen <lenny.chen@temporal.io>
1 parent 000d67d commit 5ceb3c8

62 files changed

Lines changed: 2388 additions & 3 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ Some examples require extra dependencies. See each sample's directory for specif
8989
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
9090
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
9191
* [sentry](sentry) - Report errors to Sentry.
92+
* [strands_plugin](strands_plugin) - Run Strands Agents as durable Temporal workflows (model calls, tools, MCP, HITL).
9293
* [trio_async](trio_async) - Use asyncio Temporal in Trio-based environments.
9394
* [updatable_timer](updatable_timer) - A timer that can be updated while sleeping.
9495
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.

pyproject.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ openai-agents = [
6161
]
6262
pydantic-converter = ["pydantic>=2.10.6,<3"]
6363
sentry = ["sentry-sdk>=2.13.0"]
64+
strands-agents = [
65+
"strands-agents>=1.39.0",
66+
"strands-agents-tools>=0.5.2",
67+
"mcp>=1.0.0",
68+
"boto3>=1.34.92,<2",
69+
"temporalio[strands-agents,pydantic]>=1.28.0",
70+
]
6471
trio-async = ["trio>=0.28.0,<0.29", "trio-asyncio>=0.15.0,<0.16"]
6572
cloud-export-to-parquet = [
6673
"pandas>=2.3.3,<3 ; python_version >= '3.10' and python_version < '4.0'",
@@ -115,6 +122,7 @@ packages = [
115122
"schedules",
116123
"sentry",
117124
"sleep_for_days",
125+
"strands_plugin",
118126
"tests",
119127
"trio_async",
120128
"updatable_timer",

strands_plugin/README.md

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# Strands Agents Samples
2+
3+
These samples demonstrate the [Temporal Strands plugin](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/strands), which runs [Strands Agents](https://strandsagents.com/) inside Temporal Workflows. Model invocations, tool calls, and MCP tool calls all execute as Temporal Activities, so you get durable execution, Temporal-managed retries, and timeouts.
4+
5+
## Samples
6+
7+
| Sample | Description |
8+
|--------|-------------|
9+
| [hello_world](hello_world) | Minimal `TemporalAgent` invocation. Start here. |
10+
| [tools](tools) | Three tool patterns side by side: in-workflow `@tool`, custom `@activity.defn` wrapped via `activity_as_tool`, and a `strands_tools` tool wrapped as a Temporal activity. |
11+
| [human_in_the_loop](human_in_the_loop) | Pause a tool call on `BeforeToolCallEvent.interrupt()`, resume via Temporal signal. The canonical Strands HITL pattern. |
12+
| [activity_interrupt](activity_interrupt) | Raise `InterruptException` from a Temporal activity to surface a HITL prompt across the activity boundary. Plugin-specific feature. |
13+
| [hooks](hooks) | `HookProvider` with both an in-workflow callback and an `activity_as_hook` callback for I/O. |
14+
| [mcp](mcp) | Connect to an MCP server (`FastMCP` echo) via `TemporalMCPClient`. |
15+
| [structured_output](structured_output) | Pydantic-typed agent output via `structured_output_model`. |
16+
| [streaming](streaming) | Forward model chunks to an external subscriber via `streaming_topic` + `WorkflowStream`. |
17+
| [continue_as_new](continue_as_new) | Chat-style workflow that hands off `agent.messages` when history grows large. |
18+
19+
## Prerequisites
20+
21+
1. Install dependencies:
22+
23+
```bash
24+
uv sync --group strands-agents
25+
```
26+
27+
> The `strands` extra of `temporalio` is shipping in an upcoming release. Until then, install the SDK from the strands branch:
28+
>
29+
> ```bash
30+
> uv pip install -e ../sdk-python --extra strands-agents --extra pydantic
31+
> ```
32+
33+
2. Configure AWS credentials. The samples use the plugin's default `BedrockModel()`, which picks up the standard AWS SDK credential chain. Make sure the credentials grant access to a Bedrock model in your selected region (e.g., `us-west-2`).
34+
35+
```bash
36+
export AWS_REGION=us-west-2
37+
# plus AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY or an SSO profile
38+
```
39+
40+
You can pick a specific model by passing it to `BedrockModel(model_id="...")` in each sample's worker.
41+
42+
3. Start a [Temporal dev server](https://docs.temporal.io/cli#start-dev-server):
43+
44+
```bash
45+
temporal server start-dev
46+
```
47+
48+
## Running a Sample
49+
50+
Each sample has two scripts. Start the Worker first, then the Workflow starter in a separate terminal:
51+
52+
```bash
53+
# Terminal 1: start the Worker
54+
uv run strands_plugin/<sample>/run_worker.py
55+
56+
# Terminal 2: start the Workflow
57+
uv run strands_plugin/<sample>/run_workflow.py
58+
```
59+
60+
For example, to run the tools sample:
61+
62+
```bash
63+
# Terminal 1
64+
uv run strands_plugin/tools/run_worker.py
65+
66+
# Terminal 2
67+
uv run strands_plugin/tools/run_workflow.py
68+
```
69+
70+
## Key Features Demonstrated
71+
72+
- **Durable model invocation** — every model call runs in an `invoke_model` activity with configurable timeouts and retries.
73+
- **Three ways to define tools** — pure Strands `@tool`, custom Temporal activities, and ecosystem `strands_tools` wrapped as activities.
74+
- **Human-in-the-loop** — both hook-based (`BeforeToolCallEvent.interrupt()`) and tool-body (`raise InterruptException`) styles.
75+
- **Hook system** — deterministic in-workflow callbacks plus I/O callbacks dispatched via `activity_as_hook`.
76+
- **MCP integration** — connect to MCP servers at worker startup; tool calls dispatched through per-server activities.
77+
- **Structured output** — Pydantic-typed agent results via the plugin's `pydantic_data_converter`.
78+
- **Streaming** — forward model chunks live to external subscribers.
79+
- **Long-lived chats** — hand off `agent.messages` via `continue-as-new` to stay under Temporal's history limit.
80+
81+
## Related
82+
83+
- [Temporal Strands plugin docs](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/strands)
84+
- [Strands Agents](https://strandsagents.com/)

strands_plugin/__init__.py

Whitespace-only changes.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Activity Interrupt
2+
3+
A `@activity.defn`-wrapped tool raises `InterruptException(Interrupt(...))` directly. The plugin's failure converter preserves the `Interrupt` payload across the activity boundary, so the agent stops with `stop_reason == "interrupt"` just like in the hook-based [human_in_the_loop](../human_in_the_loop) sample.
4+
5+
When to reach for this style instead of a hook:
6+
7+
- The decision to pause depends on data that's only visible inside the activity (a permissions service, a row in a database, etc.).
8+
- You don't want to load that data into workflow context just to make the call.
9+
10+
## What This Sample Demonstrates
11+
12+
- Raising `InterruptException` from a Temporal activity tool
13+
- The plugin's failure converter carrying `Interrupt` across the activity boundary
14+
- Why `StrandsPlugin` must be attached to the **client** (not just the worker)
15+
16+
## Running the Sample
17+
18+
```bash
19+
# Terminal 1
20+
uv run strands_plugin/activity_interrupt/run_worker.py
21+
22+
# Terminal 2
23+
uv run strands_plugin/activity_interrupt/run_workflow.py
24+
```
25+
26+
The starter requests deletion of a "protected" resource. The `delete_thing` activity raises an interrupt for protected names; the starter signals `"approve"` to release it.
27+
28+
## Files
29+
30+
| File | Description |
31+
|------|-------------|
32+
| `workflow.py` | `delete_thing` activity that raises `InterruptException`, plus the workflow that handles resumption |
33+
| `run_worker.py` | `StrandsPlugin` on the client + worker, registers the activity |
34+
| `run_workflow.py` | Starts the workflow and signals approval |

strands_plugin/activity_interrupt/__init__.py

Whitespace-only changes.
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
"""Worker for the activity interrupt sample."""
2+
3+
# @@@SNIPSTART python-strands-activity-interrupt-worker
4+
import asyncio
5+
import os
6+
7+
from temporalio.client import Client
8+
from temporalio.contrib.strands import StrandsPlugin
9+
from temporalio.worker import Worker
10+
11+
from strands_plugin.activity_interrupt.workflow import (
12+
ActivityInterruptWorkflow,
13+
delete_thing,
14+
)
15+
16+
17+
async def main() -> None:
18+
plugin = StrandsPlugin()
19+
# The plugin MUST be on the client so its failure converter is installed.
20+
client = await Client.connect(
21+
os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"),
22+
plugins=[plugin],
23+
)
24+
25+
worker = Worker(
26+
client,
27+
task_queue="strands-activity-interrupt",
28+
workflows=[ActivityInterruptWorkflow],
29+
activities=[delete_thing],
30+
)
31+
print("Worker started. Ctrl+C to exit.")
32+
await worker.run()
33+
34+
35+
if __name__ == "__main__":
36+
asyncio.run(main())
37+
# @@@SNIPEND
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"""Start the activity interrupt workflow."""
2+
3+
import asyncio
4+
import os
5+
6+
from temporalio.client import Client
7+
from temporalio.contrib.strands import StrandsPlugin
8+
9+
from strands_plugin.activity_interrupt.workflow import ActivityInterruptWorkflow
10+
11+
12+
async def main() -> None:
13+
# The starter also goes through the plugin's failure converter so the
14+
# Interrupt payload deserializes cleanly when the workflow result is read.
15+
client = await Client.connect(
16+
os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"),
17+
plugins=[StrandsPlugin()],
18+
)
19+
20+
handle = await client.start_workflow(
21+
ActivityInterruptWorkflow.run,
22+
"Please delete the 'system' user.",
23+
id="strands-activity-interrupt",
24+
task_queue="strands-activity-interrupt",
25+
)
26+
27+
reason = None
28+
while reason is None:
29+
await asyncio.sleep(0.5)
30+
reason = await handle.query(ActivityInterruptWorkflow.pending_approval)
31+
print(f"Approval requested: {reason}")
32+
33+
await handle.signal(ActivityInterruptWorkflow.approve, "approve")
34+
35+
result = await handle.result()
36+
print(f"Result: {result}")
37+
38+
39+
if __name__ == "__main__":
40+
asyncio.run(main())
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
"""Activity interrupt: ``InterruptException`` raised from a Temporal activity.
2+
3+
The plugin's failure converter preserves the ``Interrupt`` payload across the
4+
activity boundary, so a Temporal activity can pause the agent for human input
5+
the same way a hook can.
6+
7+
For this to work, ``StrandsPlugin`` must be attached to the **client** (not
8+
just the worker) so the failure converter is installed on the data converter.
9+
The worker in this sample does exactly that.
10+
"""
11+
12+
from datetime import timedelta
13+
from typing import Optional
14+
15+
from strands.interrupt import Interrupt, InterruptException
16+
from strands.types.interrupt import InterruptResponseContent
17+
from temporalio import activity, workflow
18+
from temporalio.contrib.strands import TemporalAgent
19+
from temporalio.contrib.strands.workflow import activity_as_tool
20+
21+
# Tracks names that have been approved out-of-band. In a real system, this
22+
# would be a row in a policy database; the human reviewer flips a flag during
23+
# the pause, and the activity's next attempt reads the new value and proceeds.
24+
_APPROVED: set[str] = set()
25+
26+
27+
# @@@SNIPSTART python-strands-activity-interrupt-activity
28+
@activity.defn
29+
async def delete_thing(name: str) -> str:
30+
if name not in _APPROVED:
31+
_APPROVED.add(name)
32+
raise InterruptException(
33+
Interrupt(
34+
id=f"delete:{name}",
35+
name="approval",
36+
reason=f"approve delete of protected resource '{name}'?",
37+
)
38+
)
39+
return f"deleted {name}"
40+
41+
42+
# @@@SNIPEND
43+
44+
45+
@workflow.defn
46+
class ActivityInterruptWorkflow:
47+
def __init__(self) -> None:
48+
self.agent = TemporalAgent(
49+
start_to_close_timeout=timedelta(seconds=60),
50+
tools=[
51+
activity_as_tool(
52+
delete_thing,
53+
start_to_close_timeout=timedelta(seconds=30),
54+
),
55+
],
56+
)
57+
self._approval: Optional[str] = None
58+
self._pending_reason: Optional[str] = None
59+
60+
@workflow.signal
61+
def approve(self, response: str) -> None:
62+
self._approval = response
63+
64+
@workflow.query
65+
def pending_approval(self) -> Optional[str]:
66+
return self._pending_reason
67+
68+
@workflow.run
69+
async def run(self, prompt: str) -> str:
70+
result = await self.agent.invoke_async(prompt)
71+
while result.stop_reason == "interrupt":
72+
interrupts = list(result.interrupts or [])
73+
self._pending_reason = interrupts[0].reason if interrupts else None
74+
await workflow.wait_condition(lambda: self._approval is not None)
75+
response = self._approval
76+
self._approval = None
77+
self._pending_reason = None
78+
responses: list[InterruptResponseContent] = [
79+
{"interruptResponse": {"interruptId": i.id, "response": response}}
80+
for i in interrupts
81+
]
82+
result = await self.agent.invoke_async(responses)
83+
return str(result)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Continue-as-new
2+
3+
A chat-style workflow accumulates history with every turn and will eventually hit Temporal's per-workflow history limit. `workflow.info().is_continue_as_new_suggested()` flips `True` once the server decides history has grown large enough; this sample checks it after each turn and hands off to a fresh run with `agent.messages` as input.
4+
5+
## What This Sample Demonstrates
6+
7+
- Driving a multi-turn chat with **updates**, so each caller gets the assistant's reply back from the same call
8+
- Seeding a new `TemporalAgent` with prior `agent.messages`
9+
- Using `workflow.info().is_continue_as_new_suggested()` + `workflow.continue_as_new(...)` to keep the workflow alive indefinitely
10+
- Draining in-flight update handlers with `workflow.all_handlers_finished` before continue-as-new
11+
12+
## Running the Sample
13+
14+
```bash
15+
# Terminal 1
16+
uv run strands_plugin/continue_as_new/run_worker.py
17+
18+
# Terminal 2
19+
uv run strands_plugin/continue_as_new/run_workflow.py
20+
```
21+
22+
The starter calls the `turn` update for each user message and prints the assistant's reply, then signals `end_chat`. In a real chatbot, a UI would drive the updates and the workflow would run indefinitely, continuing-as-new whenever history gets large.
23+
24+
## Files
25+
26+
| File | Description |
27+
|------|-------------|
28+
| `workflow.py` | `ChatInput`, `ChatWorkflow` with `turn` update, `end_chat` signal, and `messages` query |
29+
| `run_worker.py` | Registers `StrandsPlugin`, starts the worker |
30+
| `run_workflow.py` | Starts the chat, sends a few turns, ends it |

0 commit comments

Comments
 (0)