diff --git a/examples/agentframework_hitl.py b/examples/agentframework_hitl.py index 774baa5..9dd199b 100644 --- a/examples/agentframework_hitl.py +++ b/examples/agentframework_hitl.py @@ -2,26 +2,27 @@ import asyncio import os -from collections.abc import AsyncIterable -from dataclasses import dataclass +from dataclasses import dataclass, field from agent_framework import ( AgentExecutorRequest, AgentExecutorResponse, - AgentResponseUpdate, + AgentRunResponse, + ChatAgent, + ChatMessage, Executor, - Message, + RequestInfoEvent, + Role, WorkflowBuilder, WorkflowContext, - WorkflowEvent, + WorkflowOutputEvent, handler, response_handler, - tool, ) from agent_framework.openai import OpenAIChatClient from azure.identity.aio import DefaultAzureCredential, get_bearer_token_provider from dotenv import load_dotenv -from pydantic import BaseModel +from typing_extensions import Never # Configure OpenAI client based on environment load_dotenv(override=True) @@ -52,212 +53,214 @@ client = OpenAIChatClient(api_key=os.environ["OPENAI_API_KEY"], model_id=os.environ.get("OPENAI_MODEL", "gpt-4o")) """ -Sample: Human in the loop guessing game +Sample: Human-in-the-loop workflow -An agent guesses a number, then a human guides it with higher, lower, or -correct. The loop continues until the human confirms correct, at which point -the workflow completes when idle with no pending work. +Pipeline layout: +writer_agent -> Coordinator -> writer_agent +-> Coordinator -> final_editor_agent -> Coordinator -> output -Purpose: -Show how to integrate a human step in the middle of an LLM workflow by using -`request_info` and `run(responses=..., stream=True)`. +The writer agent drafts content based on a user-provided topic. A custom executor +packages the draft and emits a RequestInfoEvent so a human can comment, then replays the human +guidance back into the conversation before the final editor agent produces the polished output. -Demonstrate: -- Alternating turns between an AgentExecutor and a human, driven by events. -- Using Pydantic response_format to enforce structured JSON output from the agent instead of regex parsing. -- Driving the loop in application code with run and responses parameter. - -Prerequisites: -- Azure OpenAI configured for AzureOpenAIChatClient with required environment variables. -- Authentication via azure-identity. Use AzureCliCredential and run az login before executing the sample. -- Basic familiarity with WorkflowBuilder, executors, edges, events, and streaming runs. +Demonstrates: +- Capturing the writer's output for human review. +- Human-in-the-loop feedback that can approve or request revisions. """ -# How human-in-the-loop is achieved via `request_info` and `run(responses=..., stream=True)`: -# - An executor (TurnManager) calls `ctx.request_info` with a payload (HumanFeedbackRequest). -# - The workflow run pauses and emits a with the payload and the request_id. -# - The application captures the event, prompts the user, and collects replies. -# - The application calls `run(stream=True, responses=...)` with a map of request_ids to replies. -# - The workflow resumes, and the response is delivered to the executor method decorated with @response_handler. -# - The executor can then continue the workflow, e.g., by sending a new message to the agent. - @dataclass -class HumanFeedbackRequest: - """Request sent to the human for feedback on the agent's guess.""" - - prompt: str - +class DraftFeedbackRequest: + """Payload sent for human review.""" -class GuessOutput(BaseModel): - """Structured output from the agent. Enforced via response_format for reliable parsing.""" + prompt: str = "" + draft_text: str = "" + conversation: list[ChatMessage] = field(default_factory=list) # type: ignore[reportUnknownVariableType] - guess: int +class Coordinator(Executor): + """Bridge between the writer agent, human feedback, and final editor.""" -class TurnManager(Executor): - """Coordinates turns between the agent and the human. - - Responsibilities: - - Kick off the first agent turn. - - After each agent reply, request human feedback with a HumanFeedbackRequest. - - After each human reply, either finish the game or prompt the agent again with feedback. - """ - - def __init__(self, id: str | None = None): - super().__init__(id=id or "turn_manager") + def __init__(self, id: str, writer_id: str, final_editor_id: str) -> None: + super().__init__(id) + self.writer_id = writer_id + self.final_editor_id = final_editor_id @handler - async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None: - """Start the game by asking the agent for an initial guess. - - Contract: - - Input is a simple starter token (ignored here). - - Output is an AgentExecutorRequest that triggers the agent to produce a guess. - """ - user = Message("user", text="Start by making your first guess.") - await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True)) - - @handler - async def on_agent_response( + async def on_writer_response( self, - result: AgentExecutorResponse, - ctx: WorkflowContext, + draft: AgentExecutorResponse, + ctx: WorkflowContext[Never, AgentRunResponse], ) -> None: - """Handle the agent's guess and request human guidance. + """Handle responses from the other two agents in the workflow.""" + if draft.executor_id == self.final_editor_id: + # Final editor response; yield output directly. + await ctx.yield_output(draft.agent_run_response) + return - Steps: - 1) Parse the agent's JSON into GuessOutput for robustness. - 2) Request info with a HumanFeedbackRequest as the payload. - """ - # Parse structured model output - text = result.agent_response.text - last_guess = GuessOutput.model_validate_json(text).guess + # Writer agent response; request human feedback. + # Preserve the full conversation so the final editor + # can see tool traces and the initial prompt. + conversation: list[ChatMessage] + if draft.full_conversation is not None: + conversation = list(draft.full_conversation) + else: + conversation = list(draft.agent_run_response.messages) + draft_text = draft.agent_run_response.text.strip() + if not draft_text: + draft_text = "No draft text was produced." - # Craft a precise human prompt that defines higher and lower relative to the agent's guess. prompt = ( - f"The agent guessed: {last_guess}. " - "Type one of: higher (your number is higher than this guess), " - "lower (your number is lower than this guess), correct, or exit." + "Review the draft from the writer and provide a short directional note " + "(tone tweaks, must-have detail, target audience, etc.). " + "Keep it under 30 words." ) - # Send a request with a prompt as the payload and expect a string reply. await ctx.request_info( - request_data=HumanFeedbackRequest(prompt=prompt), + request_data=DraftFeedbackRequest(prompt=prompt, draft_text=draft_text, conversation=conversation), response_type=str, ) @response_handler async def on_human_feedback( self, - original_request: HumanFeedbackRequest, + original_request: DraftFeedbackRequest, feedback: str, - ctx: WorkflowContext[AgentExecutorRequest, str], + ctx: WorkflowContext[AgentExecutorRequest], ) -> None: - """Continue the game or finish based on human feedback.""" - reply = feedback.strip().lower() - - if reply == "correct": - await ctx.yield_output("Guessed correctly!") + note = feedback.strip() + if note.lower() == "approve": + # Human approved the draft as-is; forward it unchanged. + await ctx.send_message( + AgentExecutorRequest( + messages=original_request.conversation + + [ChatMessage(Role.USER, text="The draft is approved as-is.")], + should_respond=True, + ), + target_id=self.final_editor_id, + ) return - # Provide feedback to the agent to try again. - # response_format=GuessOutput on the agent ensures JSON output, so we just need to guide the logic. - last_guess = original_request.prompt.split(": ")[1].split(".")[0] - feedback_text = ( - f"Feedback: {reply}. Your last guess was {last_guess}. " - f"Use this feedback to adjust and make your next guess (1-10)." + # Human provided feedback; prompt the writer to revise. + conversation: list[ChatMessage] = list(original_request.conversation) + instruction = ( + "A human reviewer shared the following guidance:\n" + f"{note or 'No specific guidance provided.'}\n\n" + "Rewrite the draft from the previous assistant message into a polished final version. " + "Keep the response under 120 words and reflect any requested tone adjustments." + ) + conversation.append(ChatMessage(Role.USER, text=instruction)) + await ctx.send_message( + AgentExecutorRequest(messages=conversation, should_respond=True), target_id=self.writer_id ) - user_msg = Message("user", text=feedback_text) - await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True)) - - -async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, str] | None: - """Process events from the workflow stream to capture human feedback requests.""" - # Track the last author to format streaming output. - last_response_id: str | None = None - - requests: list[tuple[str, HumanFeedbackRequest]] = [] - async for event in stream: - if event.type == "request_info" and isinstance(event.data, HumanFeedbackRequest): - requests.append((event.request_id, event.data)) - elif event.type == "output": - if isinstance(event.data, AgentResponseUpdate): - update = event.data - response_id = update.response_id - if response_id != last_response_id: - if last_response_id is not None: - print() # Newline between different responses - print(f"{update.author_name}: {update.text}", end="", flush=True) - last_response_id = response_id - else: - print(update.text, end="", flush=True) - else: - print(f"\n{event.executor_id}: {event.data}") - - # Handle any pending human feedback requests. - if requests: - responses: dict[str, str] = {} - for request_id, request in requests: - print(f"\nHITL: {request.prompt}") - # Instructional print already appears above. The input line below is the user entry point. - # If desired, you can add more guidance here, but keep it concise. - answer = input("Enter higher/lower/correct/exit: ").lower() # noqa: ASYNC250 - if answer == "exit": - print("Exiting...") - return None - responses[request_id] = answer - return responses - - return None -async def main() -> None: - """Run the human-in-the-loop guessing game workflow.""" - # Create agent and executor - guessing_agent = client.as_agent( - name="GuessingAgent", +def create_writer_agent() -> ChatAgent: + """Creates a writer agent.""" + return client.create_agent( + name="writer_agent", + instructions=( + "You are an excellent content writer. " + "Create clear, engaging content based on the user's request. " + "Focus on clarity, accuracy, and proper structure. " + "Keep your drafts concise (3-5 sentences)." + ), + ) + + +def create_final_editor_agent() -> ChatAgent: + """Creates a final editor agent.""" + return client.create_agent( + name="final_editor_agent", instructions=( - "You guess a number between 1 and 10. " - "If the user says 'higher' or 'lower', adjust your next guess. " - 'You MUST return ONLY a JSON object exactly matching this schema: {"guess": }. ' - "No explanations or additional text." + "You are an editor who polishes marketing copy after human approval. " + "Correct any legal or factual issues. Return the final version even if no changes are made. " ), - # response_format enforces that the model produces JSON compatible with GuessOutput. - default_options={"response_format": GuessOutput}, ) - turn_manager = TurnManager(id="turn_manager") - - # Build a simple loop: TurnManager <-> AgentExecutor. - workflow = ( - WorkflowBuilder(start_executor=turn_manager) - .add_edge(turn_manager, guessing_agent) # Ask agent to make/adjust a guess - .add_edge(guessing_agent, turn_manager) # Agent's response comes back to coordinator - ).build() - - # Initiate the first run of the workflow. - # Runs are not isolated; state is preserved across multiple calls to run. - stream = workflow.run("start", stream=True) - - pending_responses = await process_event_stream(stream) - while pending_responses is not None: - # Run the workflow until there is no more human feedback to provide, - # in which case this workflow completes. - stream = workflow.run(stream=True, responses=pending_responses) - pending_responses = await process_event_stream(stream) - - """ - Sample Output: - - HITL> The agent guessed: 5. Type one of: higher (your number is higher than this guess), lower (your number is lower than this guess), correct, or exit. - Enter higher/lower/correct/exit: higher - HITL> The agent guessed: 8. Type one of: higher (your number is higher than this guess), lower (your number is lower than this guess), correct, or exit. - Enter higher/lower/correct/exit: higher - HITL> The agent guessed: 10. Type one of: higher (your number is higher than this guess), lower (your number is lower than this guess), correct, or exit. - Enter higher/lower/correct/exit: lower - HITL> The agent guessed: 9. Type one of: higher (your number is higher than this guess), lower (your number is lower than this guess), correct, or exit. - Enter higher/lower/correct/exit: correct - Workflow output: Guessed correctly: 9 - """ # noqa: E501 + + +def build_workflow(): + """Build and return the workflow.""" + return ( + WorkflowBuilder() + .register_agent(create_writer_agent, name="writer_agent") + .register_agent(create_final_editor_agent, name="final_editor_agent") + .register_executor( + lambda: Coordinator( + id="coordinator", + writer_id="writer_agent", + final_editor_id="final_editor_agent", + ), + name="coordinator", + ) + .set_start_executor("writer_agent") + .add_edge("writer_agent", "coordinator") + .add_edge("coordinator", "writer_agent") + .add_edge("final_editor_agent", "coordinator") + .add_edge("coordinator", "final_editor_agent") + .build() + ) + + +async def main() -> None: + """Run the workflow and bridge human feedback between two agents.""" + + # Build the workflow. + workflow = build_workflow() + + # Prompt user for what to write about + print("What would you like the writer to create content about?") + topic = input("Topic: ").strip() + + print( + "\nInteractive mode. When prompted, provide a short feedback note for the editor.", + flush=True, + ) + + pending_responses: dict[str, str] | None = None + completed = False + initial_run = True + + while not completed: + if initial_run: + stream = workflow.run_stream(f"Write a short piece about: {topic}") + initial_run = False + elif pending_responses is not None: + stream = workflow.send_responses_streaming(pending_responses) + pending_responses = None + else: + break + + requests: list[tuple[str, DraftFeedbackRequest]] = [] + + async for event in stream: + if isinstance(event, RequestInfoEvent) and isinstance(event.data, DraftFeedbackRequest): + # Stash the request so we can prompt the human after the stream completes. + requests.append((event.request_id, event.data)) + elif isinstance(event, WorkflowOutputEvent): + response = event.data + print("\n===== Final output =====") + final_text = getattr(response, "text", str(response)) + print(final_text.strip()) + completed = True + + if requests and not completed: + responses: dict[str, str] = {} + for request_id, request in requests: + print("\n----- Writer draft -----") + print(request.draft_text.strip()) + print("\nProvide guidance for the editor (or 'approve' to accept the draft).") + answer = input("Human feedback: ").strip() # noqa: ASYNC250 + if answer.lower() == "exit": + print("Exiting...") + return + responses[request_id] = answer + pending_responses = responses + + print("Workflow complete.") + + # Close the async credential if it was created + if async_credential is not None: + await async_credential.close() if __name__ == "__main__": diff --git a/requirements.txt b/requirements.txt index 5f00afc..1b1add2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,7 +11,7 @@ semantic-kernel langgraph langchain==1.0.2 langchain_openai==1.0.1 -pydantic-ai==1.0.8 +pydantic-ai==1.56.0 llama-index llama-index-llms-azure-openai llama-index-llms-openai-like