[BREAKING] Python: Merge send_responses into run method (#3720)

* Streamline workflow run api with send responses in one method

* Fixes

* Address copilot feedback
This commit is contained in:
Evan Mattson
2026-02-07 07:32:38 +09:00
committed by GitHub
Unverified
parent 15256bb616
commit a17f13598b
39 changed files with 561 additions and 335 deletions
@@ -193,7 +193,7 @@ async def run_agent_framework() -> None:
current_executor = None
stream_line_open = False
async for event in workflow.send_responses_streaming(responses):
async for event in workflow.run(stream=True, responses=responses):
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
# Print executor name header when switching to a new agent
if current_executor != event.executor_id:
@@ -208,9 +208,9 @@ async def _run_workflow(workflow: Workflow, user_inputs: list[str]) -> None:
responses = {req.request_id: HandoffAgentUserRequest.terminate() for req in pending_requests}
# Send responses and get new events
# We use send_responses_streaming() to get events as they occur, allowing us to
# display agent responses in real-time and handle new requests as they arrive
workflow_result = await workflow.send_responses(responses)
# We use run(responses=...) to get events, allowing us to
# display agent responses and handle new requests as they arrive
workflow_result = await workflow.run(responses=responses)
pending_requests = _handle_events(workflow_result)
@@ -255,9 +255,9 @@ async def main() -> None:
}
# Send responses and get new events
# We use send_responses() to get events from the workflow, allowing us to
# We use run(responses=...) to get events from the workflow, allowing us to
# display agent responses and handle new requests as they arrive
events = await workflow.send_responses(responses)
events = await workflow.run(responses=responses)
pending_requests = _handle_events(events)
"""
@@ -191,7 +191,7 @@ async def main() -> None:
print(f"\nUser: {user_input}")
responses = {request.request_id: HandoffAgentUserRequest.create_response(user_input)}
events = await _drain(workflow.send_responses_streaming(responses))
events = await _drain(workflow.run(stream=True, responses=responses))
requests, file_ids = _handle_events(events)
all_file_ids.extend(file_ids)
input_index += 1
@@ -29,7 +29,7 @@ Concepts highlighted here:
must keep stable IDs so the checkpoint state aligns when we rebuild the graph.
2. **Executor snapshotting** - checkpoints capture the pending plan-review request
map, at superstep boundaries.
3. **Resume with responses** - `Workflow.send_responses_streaming` accepts a
3. **Resume with responses** - `Workflow.run(responses=...)` accepts a
`responses` mapping so we can inject the stored human reply during restoration.
Prerequisites:
@@ -157,7 +157,7 @@ async def main() -> None:
# Supply the approval and continue to run to completion.
final_event: WorkflowEvent | None = None
async for event in resumed_workflow.send_responses_streaming({request_info_event.request_id: approval}):
async for event in resumed_workflow.run(stream=True, responses={request_info_event.request_id: approval}):
if event.type == "output":
final_event = event
@@ -139,14 +139,14 @@ async def main() -> None:
print("=" * 60)
# Initiate the first run of the workflow.
# Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(task, stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.send_responses_streaming(pending_responses)
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
@@ -281,7 +281,7 @@ async def main() -> None:
)
initial_run = False
elif pending_responses is not None:
stream = workflow.send_responses_streaming(pending_responses)
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = None
else:
break
@@ -249,7 +249,7 @@ async def run_interactive_session(
while True:
if responses:
event_stream = workflow.send_responses_streaming(responses)
event_stream = workflow.run(stream=True, responses=responses)
requests.clear()
responses = None
else:
@@ -23,22 +23,24 @@ from azure.identity import AzureCliCredential
"""
Sample: Handoff Workflow with Tool Approvals + Checkpoint Resume
Demonstrates the two-step pattern for resuming a handoff workflow from a checkpoint
while handling both HandoffAgentUserRequest prompts and function approval request Content
for tool calls (e.g., submit_refund).
Demonstrates resuming a handoff workflow from a checkpoint while handling both
HandoffAgentUserRequest prompts and function approval request Content for tool calls
(e.g., submit_refund).
Scenario:
1. User starts a conversation with the workflow.
2. Agents may emit user input requests or tool approval requests.
3. Workflow writes a checkpoint capturing pending requests and pauses.
4. Process can exit/restart.
5. On resume: Load the checkpoint, surface pending approvals/user prompts, and provide responses.
5. On resume: Restore checkpoint, inspect pending requests, then provide responses.
6. Workflow continues from the saved state.
Pattern:
- Step 1: workflow.run(checkpoint_id=..., stream=True) to restore checkpoint and pending requests.
- Step 2: workflow.send_responses_streaming(responses) to supply human replies and approvals.
- Two-step approach is required because send_responses_streaming does not accept checkpoint_id.
- workflow.run(checkpoint_id=..., stream=True) to restore checkpoint and discover pending requests.
- workflow.run(stream=True, responses=responses) to supply human replies and approvals.
(Two steps are needed here because the sample must inspect request types before building responses.
When response payloads are already known, use the single-call form:
workflow.run(stream=True, checkpoint_id=..., responses=responses).)
Prerequisites:
- Azure CLI authentication (az login).
@@ -228,13 +230,13 @@ async def resume_with_responses(
approve_tools: bool | None = None,
) -> tuple[list[WorkflowEvent], str | None]:
"""
Two-step resume pattern (answers customer questions and tool approvals):
Resume from checkpoint and send responses.
Step 1: Restore checkpoint to load pending requests into workflow state
Step 2: Send user responses using send_responses_streaming
Step 1: Restore checkpoint to discover pending request types.
Step 2: Build typed responses and send via workflow.run(responses=...).
This is the current pattern required because send_responses_streaming
doesn't accept a checkpoint_id parameter.
When response payloads are already known, these can be combined into a single
workflow.run(stream=True, checkpoint_id=..., responses=...) call.
"""
print(f"\n{'=' * 60}")
print("RESUMING WORKFLOW WITH HUMAN INPUT")
@@ -253,10 +255,9 @@ async def resume_with_responses(
checkpoints.sort(key=lambda cp: cp.timestamp, reverse=True)
latest_checkpoint = checkpoints[0]
print(f"Step 1: Restoring checkpoint {latest_checkpoint.checkpoint_id}")
print(f"Restoring checkpoint {latest_checkpoint.checkpoint_id}")
# Step 1: Restore the checkpoint to load pending requests into memory
# The checkpoint restoration re-emits pending request_info events
# First, restore checkpoint to discover pending requests
restored_requests: list[WorkflowEvent] = []
async for event in workflow.run(checkpoint_id=latest_checkpoint.checkpoint_id, stream=True): # type: ignore[attr-defined]
if event.type == "request_info":
@@ -274,11 +275,11 @@ async def resume_with_responses(
user_response=user_response,
approve_tools=approve_tools,
)
print(f"Step 2: Sending responses for {len(responses)} request(s)")
print(f"Sending responses for {len(responses)} request(s)")
new_pending_requests: list[WorkflowEvent] = []
async for event in workflow.send_responses_streaming(responses):
async for event in workflow.run(stream=True, responses=responses):
if event.type == "status":
print(f"[Status] {event.state}")
@@ -309,7 +310,7 @@ async def main() -> None:
This sample shows:
1. Starting a workflow and getting a HandoffAgentUserRequest
2. Pausing (checkpoint is saved automatically)
3. Resuming from checkpoint with a user response or tool approval (two-step pattern)
3. Resuming from checkpoint with a user response or tool approval
4. Continuing the conversation until completion
"""
@@ -380,7 +380,7 @@ async def main() -> None:
approval_response = "approve"
output_event: WorkflowEvent | None = None
async for event in workflow2.send_responses_streaming({request_info_event.request_id: approval_response}):
async for event in workflow2.run(stream=True, responses={request_info_event.request_id: approval_response}):
if event.type == "output":
output_event = event
@@ -347,7 +347,7 @@ async def main() -> None:
else:
print(f"Unknown request info event data type: {type(event.data)}")
run_result = await main_workflow.send_responses(responses)
run_result = await main_workflow.run(responses=responses)
outputs = run_result.get_outputs()
if outputs:
@@ -251,7 +251,7 @@ async def main() -> None:
# Continue workflow with user response
print(f"\n{YELLOW}WORKFLOW:{RESET} Restore\n")
response = AgentExternalInputResponse(user_input=user_input)
stream = workflow.send_responses_streaming({pending_request_id: response})
stream = workflow.run(stream=True, responses={pending_request_id: response})
pending_request_id = None
else:
# Start workflow
@@ -90,7 +90,7 @@ async def main():
while True:
if pending_request_id:
response = ExternalInputResponse(user_input=user_input)
stream = workflow.send_responses_streaming({pending_request_id: response})
stream = workflow.run(stream=True, responses={pending_request_id: response})
else:
stream = workflow.run({"userInput": user_input}, stream=True)
@@ -199,7 +199,7 @@ async def main() -> None:
)
# Initiate the first run of the workflow.
# Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(
"Create a short launch blurb for the LumenX desk lamp. Emphasize adjustability and warm lighting.",
stream=True,
@@ -209,7 +209,7 @@ async def main() -> None:
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.send_responses_streaming(pending_responses)
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
print("\nWorkflow complete.")
@@ -249,7 +249,7 @@ async def main() -> None:
)
# Initiate the first run of the workflow.
# Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming.
# Runs are not isolated; state is preserved across multiple calls to run.
events = await workflow.run(incoming_email)
request_info_events = events.get_request_info_events()
@@ -276,7 +276,7 @@ async def main() -> None:
print("Performing automatic approval for demo purposes...")
responses[request_info_event.request_id] = data.to_function_approval_response(approved=True)
events = await workflow.send_responses(responses)
events = await workflow.run(responses=responses)
request_info_events = events.get_request_info_events()
# The output should only come from conclude_workflow executor and it's a single string
@@ -183,14 +183,14 @@ async def main() -> None:
)
# Initiate the first run of the workflow.
# Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run("Analyze the impact of large language models on software development.", stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.send_responses_streaming(pending_responses)
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
@@ -147,7 +147,7 @@ async def main() -> None:
)
# Initiate the first run of the workflow.
# Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(
"Discuss how our team should approach adopting AI tools for productivity. "
"Consider benefits, risks, and implementation strategies.",
@@ -158,7 +158,7 @@ async def main() -> None:
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.send_responses_streaming(pending_responses)
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
@@ -29,7 +29,7 @@ the workflow completes when idle with no pending work.
Purpose:
Show how to integrate a human step in the middle of an LLM workflow by using
`request_info` and `send_responses_streaming`.
`request_info` and `run(responses=..., stream=True)`.
Demonstrate:
- Alternating turns between an AgentExecutor and a human, driven by events.
@@ -42,11 +42,11 @@ Prerequisites:
- Basic familiarity with WorkflowBuilder, executors, edges, events, and streaming runs.
"""
# How human-in-the-loop is achieved via `request_info` and `send_responses_streaming`:
# How human-in-the-loop is achieved via `request_info` and `run(responses=..., stream=True)`:
# - An executor (TurnManager) calls `ctx.request_info` with a payload (HumanFeedbackRequest).
# - The workflow run pauses and emits a with the payload and the request_id.
# - The application captures the event, prompts the user, and collects replies.
# - The application calls `send_responses_streaming` with a map of request_ids to replies.
# - The application calls `run(stream=True, responses=...)` with a map of request_ids to replies.
# - The workflow resumes, and the response is delivered to the executor method decorated with @response_handler.
# - The executor can then continue the workflow, e.g., by sending a new message to the agent.
@@ -205,14 +205,14 @@ async def main() -> None:
).build()
# Initiate the first run of the workflow.
# Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run("start", stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.send_responses_streaming(pending_responses)
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
"""
@@ -13,8 +13,8 @@ using the standard request_info pattern for consistency.
Demonstrate:
- Configuring request info with `.with_request_info()`
- Handling with AgentInputRequest data
- Injecting responses back into the workflow via send_responses_streaming
- Handling request_info events with AgentInputRequest data
- Injecting responses back into the workflow via run(responses=..., stream=True)
Prerequisites:
- Azure OpenAI configured for AzureOpenAIChatClient with required environment variables
@@ -122,14 +122,14 @@ async def main() -> None:
)
# Initiate the first run of the workflow.
# Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.send_responses_streaming(pending_responses)
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
@@ -0,0 +1,144 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import json
from typing import cast
from agent_framework import (
AgentRunUpdateEvent,
ChatAgent,
ChatMessage,
MagenticBuilder,
MagenticPlanReviewRequest,
WorkflowEvent,
)
from agent_framework.openai import OpenAIChatClient
"""
Sample: Magentic Orchestration with Human Plan Review
This sample demonstrates how humans can review and provide feedback on plans
generated by the Magentic workflow orchestrator. When plan review is enabled,
the workflow requests human approval or revision before executing each plan.
Key concepts:
- with_plan_review(): Enables human review of generated plans
- MagenticPlanReviewRequest: The event type for plan review requests
- Human can choose to: approve the plan or provide revision feedback
Plan review options:
- approve(): Accept the proposed plan and continue execution
- revise(feedback): Provide textual feedback to modify the plan
Prerequisites:
- OpenAI credentials configured for `OpenAIChatClient`.
"""
async def main() -> None:
researcher_agent = ChatAgent(
name="ResearcherAgent",
description="Specialist in research and information gathering",
instructions="You are a Researcher. You find information and gather facts.",
chat_client=OpenAIChatClient(model_id="gpt-4o"),
)
analyst_agent = ChatAgent(
name="AnalystAgent",
description="Data analyst who processes and summarizes research findings",
instructions="You are an Analyst. You analyze findings and create summaries.",
chat_client=OpenAIChatClient(model_id="gpt-4o"),
)
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator that coordinates the workflow",
instructions="You coordinate a team to complete tasks efficiently.",
chat_client=OpenAIChatClient(model_id="gpt-4o"),
)
print("\nBuilding Magentic Workflow with Human Plan Review...")
workflow = (
MagenticBuilder()
.participants([researcher_agent, analyst_agent])
.with_manager(
agent=manager_agent,
max_round_count=10,
max_stall_count=1,
max_reset_count=2,
)
.with_plan_review() # Request human input for plan review
.build()
)
task = "Research sustainable aviation fuel technology and summarize the findings."
print(f"\nTask: {task}")
print("\nStarting workflow execution...")
print("=" * 60)
pending_request: WorkflowEvent | None = None
pending_responses: dict[str, object] | None = None
output_event: WorkflowEvent | None = None
while not output_event:
if pending_responses is not None:
stream = workflow.run(stream=True, responses=pending_responses)
else:
stream = workflow.run(task, stream=True)
last_message_id: str | None = None
async for event in stream:
if isinstance(event, AgentRunUpdateEvent):
message_id = event.data.message_id
if message_id != last_message_id:
if last_message_id is not None:
print("\n")
print(f"- {event.executor_id}:", end=" ", flush=True)
last_message_id = message_id
print(event.data, end="", flush=True)
elif event.type == "request_info" and event.request_type is MagenticPlanReviewRequest:
pending_request = event
elif event.type == "output":
output_event = event
pending_responses = None
# Handle plan review request if any
if pending_request is not None:
event_data = cast(MagenticPlanReviewRequest, pending_request.data)
print("\n\n[Magentic Plan Review Request]")
if event_data.current_progress is not None:
print("Current Progress Ledger:")
print(json.dumps(event_data.current_progress.to_dict(), indent=2))
print()
print(f"Proposed Plan:\n{event_data.plan.text}\n")
print("Please provide your feedback (press Enter to approve):")
reply = await asyncio.get_event_loop().run_in_executor(None, input, "> ")
if reply.strip() == "":
print("Plan approved.\n")
pending_responses = {pending_request.request_id: event_data.approve()}
else:
print("Plan revised by human.\n")
pending_responses = {pending_request.request_id: event_data.revise(reply)}
pending_request = None
print("\n" + "=" * 60)
print("WORKFLOW COMPLETED")
print("=" * 60)
print("Final Output:")
# The output of the Magentic workflow is a list of ChatMessages with only one final message
# generated by the orchestrator.
output_messages = cast(list[ChatMessage], output_event.data)
if output_messages:
output = output_messages[-1].text
print(output)
if __name__ == "__main__":
asyncio.run(main())
@@ -155,7 +155,7 @@ async def main() -> None:
print("-" * 60)
# Initiate the first run of the workflow.
# Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(
"Manage my portfolio. Use a max of 5000 dollars to adjust my position using "
"your best judgment based on market sentiment. No need to confirm trades with me.",
@@ -166,7 +166,7 @@ async def main() -> None:
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.send_responses_streaming(pending_responses)
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
"""
@@ -165,7 +165,7 @@ async def main() -> None:
print("-" * 60)
# Initiate the first run of the workflow.
# Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(
"We need to deploy version 2.4.0 to production. Please coordinate the deployment.", stream=True
)
@@ -174,7 +174,7 @@ async def main() -> None:
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.send_responses_streaming(pending_responses)
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
"""
@@ -34,8 +34,8 @@ requiring any additional builder configuration.
Demonstrate:
- Using @tool(approval_mode="always_require") for sensitive operations.
- Handling with function_approval_request Content in sequential workflows.
- Resuming workflow execution after approval via send_responses_streaming.
- Handling request_info events with function_approval_request Content in sequential workflows.
- Resuming workflow execution after approval via run(responses=..., stream=True).
Prerequisites:
- OpenAI or Azure OpenAI configured with the required environment variables.
@@ -118,7 +118,7 @@ async def main() -> None:
print("-" * 60)
# Initiate the first run of the workflow.
# Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(
"Check the schema and then update all orders with status 'pending' to 'processing'", stream=True
)
@@ -127,7 +127,7 @@ async def main() -> None:
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.send_responses_streaming(pending_responses)
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
"""
@@ -252,7 +252,7 @@ async def run_agent_framework_example(initial_task: str, scripted_responses: Seq
except StopIteration:
user_reply = "Thanks, that's all."
responses = {request.request_id: user_reply for request in pending}
final_events = await _drain_events(workflow.send_responses_streaming(responses))
final_events = await _drain_events(workflow.run(stream=True, responses=responses))
pending = _collect_handoff_requests(final_events)
conversation = _extract_final_conversation(final_events)