Python: Handle agent user input request in AgentExecutor (#2022)

* Handle agent user input request in AgentExecutor

* fix test

* Address comments

* Fix tests

* Fix tests

* Address comments

* Address comments
This commit is contained in:
Tao Chen
2025-11-11 10:59:57 -08:00
committed by GitHub
Unverified
parent 5fd2a0c287
commit cd9073aa11
6 changed files with 727 additions and 45 deletions
@@ -2,11 +2,14 @@
import logging
from dataclasses import dataclass
from typing import Any
from typing import Any, cast
from agent_framework import FunctionApprovalRequestContent, FunctionApprovalResponseContent
from .._agents import AgentProtocol, ChatAgent
from .._threads import AgentThread
from .._types import AgentRunResponse, AgentRunResponseUpdate, ChatMessage
from ._checkpoint_encoding import decode_checkpoint_value, encode_checkpoint_value
from ._conversation_state import encode_chat_messages
from ._events import (
AgentRunEvent,
@@ -14,6 +17,7 @@ from ._events import (
)
from ._executor import Executor, handler
from ._message_utils import normalize_messages_input
from ._request_info_mixin import response_handler
from ._workflow_context import WorkflowContext
logger = logging.getLogger(__name__)
@@ -83,6 +87,8 @@ class AgentExecutor(Executor):
super().__init__(exec_id)
self._agent = agent
self._agent_thread = agent_thread or self._agent.get_new_thread()
self._pending_agent_requests: dict[str, FunctionApprovalRequestContent] = {}
self._pending_responses_to_agent: list[FunctionApprovalResponseContent] = []
self._output_response = output_response
self._cache: list[ChatMessage] = []
@@ -93,50 +99,6 @@ class AgentExecutor(Executor):
return [AgentRunResponse]
return []
async def _run_agent_and_emit(self, ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse]) -> None:
"""Execute the underlying agent, emit events, and enqueue response.
Checks ctx.is_streaming() to determine whether to emit incremental AgentRunUpdateEvent
events (streaming mode) or a single AgentRunEvent (non-streaming mode).
"""
if ctx.is_streaming():
# Streaming mode: emit incremental updates
updates: list[AgentRunResponseUpdate] = []
async for update in self._agent.run_stream(
self._cache,
thread=self._agent_thread,
):
updates.append(update)
await ctx.add_event(AgentRunUpdateEvent(self.id, update))
if isinstance(self._agent, ChatAgent):
response_format = self._agent.chat_options.response_format
response = AgentRunResponse.from_agent_run_response_updates(
updates,
output_format_type=response_format,
)
else:
response = AgentRunResponse.from_agent_run_response_updates(updates)
else:
# Non-streaming mode: use run() and emit single event
response = await self._agent.run(
self._cache,
thread=self._agent_thread,
)
await ctx.add_event(AgentRunEvent(self.id, response))
if self._output_response:
await ctx.yield_output(response)
# Always construct a full conversation snapshot from inputs (cache)
# plus agent outputs (agent_run_response.messages). Do not mutate
# response.messages so AgentRunEvent remains faithful to the raw output.
full_conversation: list[ChatMessage] = list(self._cache) + list(response.messages)
agent_response = AgentExecutorResponse(self.id, response, full_conversation=full_conversation)
await ctx.send_message(agent_response)
self._cache.clear()
@handler
async def run(
self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse]
@@ -192,6 +154,31 @@ class AgentExecutor(Executor):
self._cache = normalize_messages_input(messages)
await self._run_agent_and_emit(ctx)
@response_handler
async def handle_user_input_response(
self,
original_request: FunctionApprovalRequestContent,
response: FunctionApprovalResponseContent,
ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse],
) -> None:
"""Handle user input responses for function approvals during agent execution.
This will hold the executor's execution until all pending user input requests are resolved.
Args:
original_request: The original function approval request sent by the agent.
response: The user's response to the function approval request.
ctx: The workflow context for emitting events and outputs.
"""
self._pending_responses_to_agent.append(response)
self._pending_agent_requests.pop(original_request.id, None)
if not self._pending_agent_requests:
# All pending requests have been resolved; resume agent execution
self._cache = normalize_messages_input(ChatMessage(role="user", contents=self._pending_responses_to_agent))
self._pending_responses_to_agent.clear()
await self._run_agent_and_emit(ctx)
async def snapshot_state(self) -> dict[str, Any]:
"""Capture current executor state for checkpointing.
@@ -226,6 +213,8 @@ class AgentExecutor(Executor):
return {
"cache": encode_chat_messages(self._cache),
"agent_thread": serialized_thread,
"pending_agent_requests": encode_checkpoint_value(self._pending_agent_requests),
"pending_responses_to_agent": encode_checkpoint_value(self._pending_responses_to_agent),
}
async def restore_state(self, state: dict[str, Any]) -> None:
@@ -258,7 +247,109 @@ class AgentExecutor(Executor):
else:
self._agent_thread = self._agent.get_new_thread()
pending_requests_payload = state.get("pending_agent_requests")
if pending_requests_payload:
self._pending_agent_requests = decode_checkpoint_value(pending_requests_payload)
pending_responses_payload = state.get("pending_responses_to_agent")
if pending_responses_payload:
self._pending_responses_to_agent = decode_checkpoint_value(pending_responses_payload)
def reset(self) -> None:
"""Reset the internal cache of the executor."""
logger.debug("AgentExecutor %s: Resetting cache", self.id)
self._cache.clear()
async def _run_agent_and_emit(self, ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse]) -> None:
"""Execute the underlying agent, emit events, and enqueue response.
Checks ctx.is_streaming() to determine whether to emit incremental AgentRunUpdateEvent
events (streaming mode) or a single AgentRunEvent (non-streaming mode).
"""
if ctx.is_streaming():
# Streaming mode: emit incremental updates
response = await self._run_agent_streaming(cast(WorkflowContext, ctx))
else:
# Non-streaming mode: use run() and emit single event
response = await self._run_agent(cast(WorkflowContext, ctx))
if response is None:
# Agent did not complete (e.g., waiting for user input); do not emit response
logger.info("AgentExecutor %s: Agent did not complete, awaiting user input", self.id)
return
if self._output_response:
await ctx.yield_output(response)
# Always construct a full conversation snapshot from inputs (cache)
# plus agent outputs (agent_run_response.messages). Do not mutate
# response.messages so AgentRunEvent remains faithful to the raw output.
full_conversation: list[ChatMessage] = list(self._cache) + list(response.messages)
agent_response = AgentExecutorResponse(self.id, response, full_conversation=full_conversation)
await ctx.send_message(agent_response)
self._cache.clear()
async def _run_agent(self, ctx: WorkflowContext) -> AgentRunResponse | None:
"""Execute the underlying agent in non-streaming mode.
Args:
ctx: The workflow context for emitting events.
Returns:
The complete AgentRunResponse, or None if waiting for user input.
"""
response = await self._agent.run(
self._cache,
thread=self._agent_thread,
)
await ctx.add_event(AgentRunEvent(self.id, response))
# Handle any user input requests
if response.user_input_requests:
for user_input_request in response.user_input_requests:
self._pending_agent_requests[user_input_request.id] = user_input_request
await ctx.request_info(user_input_request, FunctionApprovalResponseContent)
return None
return response
async def _run_agent_streaming(self, ctx: WorkflowContext) -> AgentRunResponse | None:
"""Execute the underlying agent in streaming mode and collect the full response.
Args:
ctx: The workflow context for emitting events.
Returns:
The complete AgentRunResponse, or None if waiting for user input.
"""
updates: list[AgentRunResponseUpdate] = []
user_input_requests: list[FunctionApprovalRequestContent] = []
async for update in self._agent.run_stream(
self._cache,
thread=self._agent_thread,
):
updates.append(update)
await ctx.add_event(AgentRunUpdateEvent(self.id, update))
if update.user_input_requests:
user_input_requests.extend(update.user_input_requests)
# Build the final AgentRunResponse from the collected updates
if isinstance(self._agent, ChatAgent):
response_format = self._agent.chat_options.response_format
response = AgentRunResponse.from_agent_run_response_updates(
updates,
output_format_type=response_format,
)
else:
response = AgentRunResponse.from_agent_run_response_updates(updates)
# Handle any user input requests after the streaming completes
if user_input_requests:
for user_input_request in user_input_requests:
self._pending_agent_requests[user_input_request.id] = user_input_request
await ctx.request_info(user_input_request, FunctionApprovalResponseContent)
return None
return response
@@ -111,6 +111,10 @@ async def test_agent_executor_checkpoint_stores_and_restores_state() -> None:
chat_store_state = thread_state["chat_message_store_state"] # type: ignore[index]
assert "messages" in chat_store_state, "Message store state should include messages"
# Verify checkpoint contains pending requests from agents and responses to be sent
assert "pending_agent_requests" in executor_state
assert "pending_responses_to_agent" in executor_state
# Create a new agent and executor for restoration
# This simulates starting from a fresh state and restoring from checkpoint
restored_agent = _CountingAgent(id="test_agent", name="TestAgent")
@@ -5,19 +5,32 @@
from collections.abc import AsyncIterable
from typing import Any
from typing_extensions import Never
from agent_framework import (
AgentExecutor,
AgentExecutorResponse,
AgentRunResponse,
AgentRunResponseUpdate,
AgentRunUpdateEvent,
AgentThread,
BaseAgent,
ChatAgent,
ChatMessage,
ChatResponse,
ChatResponseUpdate,
FunctionApprovalRequestContent,
FunctionCallContent,
FunctionResultContent,
RequestInfoEvent,
Role,
TextContent,
WorkflowBuilder,
WorkflowContext,
WorkflowOutputEvent,
ai_function,
executor,
use_function_invocation,
)
@@ -120,3 +133,235 @@ async def test_agent_executor_emits_tool_calls_in_streaming_mode() -> None:
assert events[3].data is not None
assert isinstance(events[3].data.contents[0], TextContent)
assert "sunny" in events[3].data.contents[0].text
@ai_function(approval_mode="always_require")
def mock_tool_requiring_approval(query: str) -> str:
"""Mock tool that requires approval before execution."""
return f"Executed tool with query: {query}"
@use_function_invocation
class MockChatClient:
"""Simple implementation of a chat client."""
def __init__(self, parallel_request: bool = False) -> None:
self.additional_properties: dict[str, Any] = {}
self._iteration: int = 0
self._parallel_request: bool = parallel_request
async def get_response(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage],
**kwargs: Any,
) -> ChatResponse:
if self._iteration == 0:
if self._parallel_request:
response = ChatResponse(
messages=ChatMessage(
role="assistant",
contents=[
FunctionCallContent(
call_id="1", name="mock_tool_requiring_approval", arguments='{"query": "test"}'
),
FunctionCallContent(
call_id="2", name="mock_tool_requiring_approval", arguments='{"query": "test"}'
),
],
)
)
else:
response = ChatResponse(
messages=ChatMessage(
role="assistant",
contents=[
FunctionCallContent(
call_id="1", name="mock_tool_requiring_approval", arguments='{"query": "test"}'
)
],
)
)
else:
response = ChatResponse(messages=ChatMessage(role="assistant", text="Tool executed successfully."))
self._iteration += 1
return response
async def get_streaming_response(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage],
**kwargs: Any,
) -> AsyncIterable[ChatResponseUpdate]:
if self._iteration == 0:
if self._parallel_request:
yield ChatResponseUpdate(
contents=[
FunctionCallContent(
call_id="1", name="mock_tool_requiring_approval", arguments='{"query": "test"}'
),
FunctionCallContent(
call_id="2", name="mock_tool_requiring_approval", arguments='{"query": "test"}'
),
],
role="assistant",
)
else:
yield ChatResponseUpdate(
contents=[
FunctionCallContent(
call_id="1", name="mock_tool_requiring_approval", arguments='{"query": "test"}'
)
],
role="assistant",
)
else:
yield ChatResponseUpdate(text=TextContent(text="Tool executed "), role="assistant")
yield ChatResponseUpdate(contents=[TextContent(text="successfully.")], role="assistant")
self._iteration += 1
@executor(id="test_executor")
async def test_executor(agent_executor_response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(agent_executor_response.agent_run_response.text)
async def test_agent_executor_tool_call_with_approval() -> None:
"""Test that AgentExecutor handles tool calls requiring approval."""
# Arrange
agent = ChatAgent(
chat_client=MockChatClient(),
name="ApprovalAgent",
tools=[mock_tool_requiring_approval],
)
workflow = WorkflowBuilder().set_start_executor(agent).add_edge(agent, test_executor).build()
# Act
events = await workflow.run("Invoke tool requiring approval")
# Assert
assert len(events.get_request_info_events()) == 1
approval_request = events.get_request_info_events()[0]
assert isinstance(approval_request.data, FunctionApprovalRequestContent)
assert approval_request.data.function_call.name == "mock_tool_requiring_approval"
assert approval_request.data.function_call.arguments == '{"query": "test"}'
# Act
events = await workflow.send_responses({approval_request.request_id: approval_request.data.create_response(True)})
# Assert
final_response = events.get_outputs()
assert len(final_response) == 1
assert final_response[0] == "Tool executed successfully."
async def test_agent_executor_tool_call_with_approval_streaming() -> None:
"""Test that AgentExecutor handles tool calls requiring approval in streaming mode."""
# Arrange
agent = ChatAgent(
chat_client=MockChatClient(),
name="ApprovalAgent",
tools=[mock_tool_requiring_approval],
)
workflow = WorkflowBuilder().set_start_executor(agent).add_edge(agent, test_executor).build()
# Act
request_info_events: list[RequestInfoEvent] = []
async for event in workflow.run_stream("Invoke tool requiring approval"):
if isinstance(event, RequestInfoEvent):
request_info_events.append(event)
# Assert
assert len(request_info_events) == 1
approval_request = request_info_events[0]
assert isinstance(approval_request.data, FunctionApprovalRequestContent)
assert approval_request.data.function_call.name == "mock_tool_requiring_approval"
assert approval_request.data.function_call.arguments == '{"query": "test"}'
# Act
output: str | None = None
async for event in workflow.send_responses_streaming({
approval_request.request_id: approval_request.data.create_response(True)
}):
if isinstance(event, WorkflowOutputEvent):
output = event.data
# Assert
assert output is not None
assert output == "Tool executed successfully."
async def test_agent_executor_parallel_tool_call_with_approval() -> None:
"""Test that AgentExecutor handles parallel tool calls requiring approval."""
# Arrange
agent = ChatAgent(
chat_client=MockChatClient(parallel_request=True),
name="ApprovalAgent",
tools=[mock_tool_requiring_approval],
)
workflow = WorkflowBuilder().set_start_executor(agent).add_edge(agent, test_executor).build()
# Act
events = await workflow.run("Invoke tool requiring approval")
# Assert
assert len(events.get_request_info_events()) == 2
for approval_request in events.get_request_info_events():
assert isinstance(approval_request.data, FunctionApprovalRequestContent)
assert approval_request.data.function_call.name == "mock_tool_requiring_approval"
assert approval_request.data.function_call.arguments == '{"query": "test"}'
# Act
responses = {
approval_request.request_id: approval_request.data.create_response(True) # type: ignore
for approval_request in events.get_request_info_events()
}
events = await workflow.send_responses(responses)
# Assert
final_response = events.get_outputs()
assert len(final_response) == 1
assert final_response[0] == "Tool executed successfully."
async def test_agent_executor_parallel_tool_call_with_approval_streaming() -> None:
"""Test that AgentExecutor handles parallel tool calls requiring approval in streaming mode."""
# Arrange
agent = ChatAgent(
chat_client=MockChatClient(parallel_request=True),
name="ApprovalAgent",
tools=[mock_tool_requiring_approval],
)
workflow = WorkflowBuilder().set_start_executor(agent).add_edge(agent, test_executor).build()
# Act
request_info_events: list[RequestInfoEvent] = []
async for event in workflow.run_stream("Invoke tool requiring approval"):
if isinstance(event, RequestInfoEvent):
request_info_events.append(event)
# Assert
assert len(request_info_events) == 2
for approval_request in request_info_events:
assert isinstance(approval_request.data, FunctionApprovalRequestContent)
assert approval_request.data.function_call.name == "mock_tool_requiring_approval"
assert approval_request.data.function_call.arguments == '{"query": "test"}'
# Act
responses = {
approval_request.request_id: approval_request.data.create_response(True) # type: ignore
for approval_request in request_info_events
}
output: str | None = None
async for event in workflow.send_responses_streaming(responses):
if isinstance(event, WorkflowOutputEvent):
output = event.data
# Assert
assert output is not None
assert output == "Tool executed successfully."
+1
View File
@@ -281,6 +281,7 @@ This directory contains samples demonstrating the capabilities of Microsoft Agen
| File | Description |
|------|-------------|
| [`getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py`](./getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py) | Sample: Human in the loop guessing game |
| [`getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py`](./getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py) | Sample: Agents with Approval Requests in Workflows |
### Observability
@@ -78,6 +78,7 @@ Once comfortable with these, explore the rest of the samples below.
|---|---|---|
| Human-In-The-Loop (Guessing Game) | [human-in-the-loop/guessing_game_with_human_input.py](./human-in-the-loop/guessing_game_with_human_input.py) | Interactive request/response prompts with a human |
| Azure Agents Tool Feedback Loop | [agents/azure_chat_agents_tool_calls_with_feedback.py](./agents/azure_chat_agents_tool_calls_with_feedback.py) | Two-agent workflow that streams tool calls and pauses for human guidance between passes |
| Agents with Approval Requests in Workflows | [human-in-the-loop/agents_with_approval_requests.py](./human-in-the-loop/agents_with_approval_requests.py) | Agents that create approval requests during workflow execution and wait for human approval to proceed |
### observability
@@ -0,0 +1,340 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import json
from dataclasses import dataclass
from typing import Annotated, Never
from agent_framework import (
AgentExecutorResponse,
ChatMessage,
Executor,
FunctionApprovalRequestContent,
FunctionApprovalResponseContent,
WorkflowBuilder,
WorkflowContext,
ai_function,
executor,
handler,
)
from agent_framework.openai import OpenAIChatClient
"""
Sample: Agents in a workflow with AI functions requiring approval
This sample creates a workflow that automatically replies to incoming emails.
If historical email data is needed, it uses an AI function to read the data,
which requires human approval before execution.
This sample works as follows:
1. An incoming email is received by the workflow.
2. The EmailPreprocessor executor preprocesses the email, adding special notes if the sender is important.
3. The preprocessed email is sent to the Email Writer agent, which generates a response.
4. If the agent needs to read historical email data, it calls the read_historical_email_data AI function,
which triggers an approval request.
5. The sample automatically approves the request for demonstration purposes.
6. Once approved, the AI function executes and returns the historical email data to the agent.
7. The agent uses the historical data to compose a comprehensive email response.
8. The response is sent to the conclude_workflow_executor, which yields the final response.
Purpose:
Show how to integrate AI functions with approval requests into a workflow.
Demonstrate:
- Creating AI functions that require approval before execution.
- Building a workflow that includes an agent and executors.
- Handling approval requests during workflow execution.
Prerequisites:
- Azure AI Agent Service configured, along with the required environment variables.
- Authentication via azure-identity. Use AzureCliCredential and run az login before executing the sample.
- Basic familiarity with WorkflowBuilder, edges, events, RequestInfoEvent, and streaming runs.
"""
@ai_function
def get_current_date() -> str:
"""Get the current date in YYYY-MM-DD format."""
# For demonstration purposes, we return a fixed date.
return "2025-11-07"
@ai_function
def get_team_members_email_addresses() -> list[dict[str, str]]:
"""Get the email addresses of team members."""
# In a real implementation, this might query a database or directory service.
return [
{
"name": "Alice",
"email": "alice@contoso.com",
"position": "Software Engineer",
"manager": "John Doe",
},
{
"name": "Bob",
"email": "bob@contoso.com",
"position": "Product Manager",
"manager": "John Doe",
},
{
"name": "Charlie",
"email": "charlie@contoso.com",
"position": "Senior Software Engineer",
"manager": "John Doe",
},
{
"name": "Mike",
"email": "mike@contoso.com",
"position": "Principal Software Engineer Manager",
"manager": "VP of Engineering",
},
]
@ai_function
def get_my_information() -> dict[str, str]:
"""Get my personal information."""
return {
"name": "John Doe",
"email": "john@contoso.com",
"position": "Software Engineer Manager",
"manager": "Mike",
}
@ai_function(approval_mode="always_require")
async def read_historical_email_data(
email_address: Annotated[str, "The email address to read historical data from"],
start_date: Annotated[str, "The start date in YYYY-MM-DD format"],
end_date: Annotated[str, "The end date in YYYY-MM-DD format"],
) -> list[dict[str, str]]:
"""Read historical email data for a given email address and date range."""
historical_data = {
"alice@contoso.com": [
{
"from": "alice@contoso.com",
"to": "john@contoso.com",
"date": "2025-11-05",
"subject": "Bug Bash Results",
"body": "We just completed the bug bash and found a few issues that need immediate attention.",
},
{
"from": "alice@contoso.com",
"to": "john@contoso.com",
"date": "2025-11-03",
"subject": "Code Freeze",
"body": "We are entering code freeze starting tomorrow.",
},
],
"bob@contoso.com": [
{
"from": "bob@contoso.com",
"to": "john@contoso.com",
"date": "2025-11-04",
"subject": "Team Outing",
"body": "Don't forget about the team outing this Friday!",
},
{
"from": "bob@contoso.com",
"to": "john@contoso.com",
"date": "2025-11-02",
"subject": "Requirements Update",
"body": "The requirements for the new feature have been updated. Please review them.",
},
],
"charlie@contoso.com": [
{
"from": "charlie@contoso.com",
"to": "john@contoso.com",
"date": "2025-11-05",
"subject": "Project Update",
"body": "The bug bash went well. A few critical bugs but should be fixed by the end of the week.",
},
{
"from": "charlie@contoso.com",
"to": "john@contoso.com",
"date": "2025-11-06",
"subject": "Code Review",
"body": "Please review my latest code changes.",
},
],
}
emails = historical_data.get(email_address, [])
return [email for email in emails if start_date <= email["date"] <= end_date]
@ai_function(approval_mode="always_require")
async def send_email(
to: Annotated[str, "The recipient email address"],
subject: Annotated[str, "The email subject"],
body: Annotated[str, "The email body"],
) -> str:
"""Send an email."""
await asyncio.sleep(1) # Simulate sending email
return "Email successfully sent."
@dataclass
class Email:
sender: str
subject: str
body: str
class EmailPreprocessor(Executor):
def __init__(self, special_email_addresses: set[str]) -> None:
super().__init__(id="email_preprocessor")
self.special_email_addresses = special_email_addresses
@handler
async def preprocess(self, email: Email, ctx: WorkflowContext[str]) -> None:
"""Preprocess the incoming email."""
message = str(email)
if email.sender in self.special_email_addresses:
note = (
"Pay special attention to this sender. This email is very important. "
"Gather relevant information from all previous emails within my team before responding."
)
message = f"{note}\n\n{message}"
await ctx.send_message(message)
@executor(id="conclude_workflow_executor")
async def conclude_workflow(
email_response: AgentExecutorResponse,
ctx: WorkflowContext[Never, str],
) -> None:
"""Conclude the workflow by yielding the final email response."""
await ctx.yield_output(email_response.agent_run_response.text)
async def main() -> None:
# Create the agent and executors
chat_client = OpenAIChatClient()
email_writer = chat_client.create_agent(
name="Email Writer",
instructions=("You are an excellent email assistant. You respond to incoming emails."),
# tools with `approval_mode="always_require"` will trigger approval requests
tools=[
read_historical_email_data,
send_email,
get_current_date,
get_team_members_email_addresses,
get_my_information,
],
)
email_preprocessor = EmailPreprocessor(special_email_addresses={"mike@contoso.com"})
# Build the workflow
workflow = (
WorkflowBuilder()
.set_start_executor(email_preprocessor)
.add_edge(email_preprocessor, email_writer)
.add_edge(email_writer, conclude_workflow)
.build()
)
# Simulate an incoming email
incoming_email = Email(
sender="mike@contoso.com",
subject="Important: Project Update",
body="Please provide your team's status update on the project since last week.",
)
responses: dict[str, FunctionApprovalResponseContent] = {}
output: list[ChatMessage] | None = None
while True:
if responses:
events = await workflow.send_responses(responses)
responses.clear()
else:
events = await workflow.run(incoming_email)
request_info_events = events.get_request_info_events()
for request_info_event in request_info_events:
# We should only expect FunctionApprovalRequestContent in this sample
if not isinstance(request_info_event.data, FunctionApprovalRequestContent):
raise ValueError(f"Unexpected request info content type: {type(request_info_event.data)}")
# Pretty print the function call details
arguments = json.dumps(request_info_event.data.function_call.parse_arguments(), indent=2)
print(
f"Received approval request for function: {request_info_event.data.function_call.name} "
f"with args:\n{arguments}"
)
# For demo purposes, we automatically approve the request
# The expected response type of the request is `FunctionApprovalResponseContent`,
# which can be created via `create_response` method on the request content
print("Performing automatic approval for demo purposes...")
responses[request_info_event.request_id] = request_info_event.data.create_response(approved=True)
# Once we get an output event, we can conclude the workflow
# Outputs can only be produced by the conclude_workflow_executor in this sample
if outputs := events.get_outputs():
# We expect only one output from the conclude_workflow_executor
output = outputs[0]
break
if not output:
raise RuntimeError("Workflow did not produce any output event.")
print("Final email response conversation:")
print(output)
"""
Sample Output:
Received approval request for function: read_historical_email_data with args:
{
"email_address": "alice@contoso.com",
"start_date": "2025-10-31",
"end_date": "2025-11-07"
}
Performing automatic approval for demo purposes...
Received approval request for function: read_historical_email_data with args:
{
"email_address": "bob@contoso.com",
"start_date": "2025-10-31",
"end_date": "2025-11-07"
}
Performing automatic approval for demo purposes...
Received approval request for function: read_historical_email_data with args:
{
"email_address": "charlie@contoso.com",
"start_date": "2025-10-31",
"end_date": "2025-11-07"
}
Performing automatic approval for demo purposes...
Received approval request for function: send_email with args:
{
"to": "mike@contoso.com",
"subject": "Team's Status Update on the Project",
"body": "
Hi Mike,
Here's the status update from our team:
- **Bug Bash and Code Freeze:**
- We recently completed a bug bash, during which several issues were identified. Alice and Charlie are working on fixing these critical bugs, and we anticipate resolving them by the end of this week.
- We have entered a code freeze as of November 4, 2025.
- **Requirements Update:**
- Bob has updated the requirements for a new feature, and all team members are reviewing these changes to ensure alignment.
- **Ongoing Reviews:**
- Charlie has submitted his latest code changes for review to ensure they meet our quality standards.
Please let me know if you need more detailed information or have any questions.
Best regards,
John"
}
Performing automatic approval for demo purposes...
Final email response conversation:
I've sent the status update to Mike with the relevant information from the team. Let me know if there's anything else you need
""" # noqa: E501
if __name__ == "__main__":
asyncio.run(main())