Python: extend HITL support for all orchestration patterns (#2620)

* Support HITL for orchestration patterns

* Cleanup around naming

* Fix typing issues

* Clean up

* Naming clean up

* Updates to HITL to make it cleaner

* Rename human input hook to orchestration request info

* Clean up per PR feedback
This commit is contained in:
Evan Mattson
2025-12-11 00:59:29 +09:00
committed by GitHub
Unverified
parent 0d9ae1920d
commit b378ca75d1
23 changed files with 2186 additions and 36 deletions
@@ -85,6 +85,7 @@ from ._magentic import (
MagenticStallInterventionRequest,
StandardMagenticManager,
)
from ._orchestration_request_info import AgentInputRequest, AgentResponseReviewRequest, RequestInfoInterceptor
from ._orchestration_state import OrchestrationState
from ._request_info_mixin import response_handler
from ._runner import Runner
@@ -122,6 +123,8 @@ __all__ = [
"AgentExecutor",
"AgentExecutorRequest",
"AgentExecutorResponse",
"AgentInputRequest",
"AgentResponseReviewRequest",
"AgentRunEvent",
"AgentRunUpdateEvent",
"Case",
@@ -164,6 +167,7 @@ __all__ = [
"Message",
"OrchestrationState",
"RequestInfoEvent",
"RequestInfoInterceptor",
"Runner",
"RunnerContext",
"SequentialBuilder",
@@ -47,7 +47,9 @@ class BaseGroupChatOrchestrator(Executor, ABC):
self._max_rounds: int | None = None
self._termination_condition: Callable[[list[ChatMessage]], bool | Awaitable[bool]] | None = None
def register_participant_entry(self, name: str, *, entry_id: str, is_agent: bool) -> None:
def register_participant_entry(
self, name: str, *, entry_id: str, is_agent: bool, exit_id: str | None = None
) -> None:
"""Record routing details for a participant's entry executor.
This method provides a unified interface for registering participants
@@ -57,8 +59,10 @@ class BaseGroupChatOrchestrator(Executor, ABC):
name: Participant name (used for selection and tracking)
entry_id: Executor ID for this participant's entry point
is_agent: Whether this is an AgentExecutor (True) or custom Executor (False)
exit_id: Executor ID for this participant's exit point (where responses come from).
If None, defaults to entry_id.
"""
self._registry.register(name, entry_id=entry_id, is_agent=is_agent)
self._registry.register(name, entry_id=entry_id, is_agent=is_agent, exit_id=exit_id)
# Conversation state management (shared across all patterns)
@@ -14,6 +14,7 @@ from ._agent_executor import AgentExecutorRequest, AgentExecutorResponse
from ._checkpoint import CheckpointStorage
from ._executor import Executor, handler
from ._message_utils import normalize_messages_input
from ._orchestration_request_info import RequestInfoInterceptor
from ._workflow import Workflow
from ._workflow_builder import WorkflowBuilder
from ._workflow_context import WorkflowContext
@@ -209,15 +210,18 @@ class ConcurrentBuilder:
workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_custom_aggregator(summarize).build()
# Enable checkpoint persistence so runs can resume
workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_checkpointing(storage).build()
# Enable request info before aggregation
workflow = ConcurrentBuilder().participants([agent1, agent2]).with_request_info().build()
"""
def __init__(self) -> None:
self._participants: list[AgentProtocol | Executor] = []
self._aggregator: Executor | None = None
self._checkpoint_storage: CheckpointStorage | None = None
self._request_info_enabled: bool = False
def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "ConcurrentBuilder":
r"""Define the parallel participants for this concurrent workflow.
@@ -296,12 +300,33 @@ class ConcurrentBuilder:
self._checkpoint_storage = checkpoint_storage
return self
def with_request_info(self) -> "ConcurrentBuilder":
"""Enable request info before aggregation in the workflow.
When enabled, the workflow pauses after all parallel agents complete,
emitting a RequestInfoEvent that allows the caller to review and optionally
modify the combined results before aggregation. The caller provides feedback
via the standard response_handler/request_info pattern.
Note:
Unlike SequentialBuilder and GroupChatBuilder, ConcurrentBuilder does not
support per-agent filtering since all agents run in parallel and results
are collected together. The pause occurs once with all agent outputs received.
Returns:
self: The builder instance for fluent chaining.
"""
self._request_info_enabled = True
return self
def build(self) -> Workflow:
r"""Build and validate the concurrent workflow.
Wiring pattern:
- Dispatcher (internal) fans out the input to all `participants`
- Fan-in aggregator collects `AgentExecutorResponse` objects
- Fan-in collects `AgentExecutorResponse` objects from all participants
- If request info is enabled, the orchestration emits a request info event with outputs from all participants
before sending the outputs to the aggregator
- Aggregator yields output and the workflow becomes idle. The output is either:
- list[ChatMessage] (default aggregator: one user + one assistant per agent)
- custom payload from the provided callback/executor
@@ -327,7 +352,16 @@ class ConcurrentBuilder:
builder = WorkflowBuilder()
builder.set_start_executor(dispatcher)
builder.add_fan_out_edges(dispatcher, list(self._participants))
builder.add_fan_in_edges(list(self._participants), aggregator)
if self._request_info_enabled:
# Insert interceptor between fan-in and aggregator
# participants -> fan-in -> interceptor -> aggregator
request_info_interceptor = RequestInfoInterceptor(executor_id="request_info")
builder.add_fan_in_edges(list(self._participants), request_info_interceptor)
builder.add_edge(request_info_interceptor, aggregator)
else:
# Direct fan-in to aggregator
builder.add_fan_in_edges(list(self._participants), aggregator)
if self._checkpoint_storage is not None:
builder = builder.with_checkpointing(self._checkpoint_storage)
@@ -36,6 +36,7 @@ from ._base_group_chat_orchestrator import BaseGroupChatOrchestrator
from ._checkpoint import CheckpointStorage
from ._conversation_history import ensure_author, latest_user_message
from ._executor import Executor, handler
from ._orchestration_request_info import RequestInfoInterceptor
from ._participant_utils import GroupChatParticipantSpec, prepare_participant_metadata, wrap_participant
from ._workflow import Workflow
from ._workflow_builder import WorkflowBuilder
@@ -562,14 +563,36 @@ class GroupChatOrchestratorExecutor(BaseGroupChatOrchestrator):
participant_name: str,
message: ChatMessage,
ctx: WorkflowContext[AgentExecutorRequest | _GroupChatRequestMessage, list[ChatMessage]],
trailing_messages: list[ChatMessage] | None = None,
) -> None:
"""Common response ingestion logic shared by agent and custom participants."""
"""Common response ingestion logic shared by agent and custom participants.
Args:
participant_name: Name of the participant who sent the message
message: The participant's response message
ctx: Workflow context for routing and output
trailing_messages: Optional list of messages to inject after the participant's
message (e.g., additional input from the RequestInfoInterceptor)
"""
if participant_name not in self._participants:
raise ValueError(f"Received response from unknown participant '{participant_name}'.")
message = ensure_author(message, participant_name)
self._conversation.extend((message,))
self._history.append(_GroupChatTurn(participant_name, "agent", message))
# Inject any trailing messages (e.g., human input) into the conversation
if trailing_messages:
for trailing_msg in trailing_messages:
self._conversation.extend((trailing_msg,))
# Record as user input in history
author = trailing_msg.author_name or "human"
self._history.append(_GroupChatTurn(author, "user", trailing_msg))
logger.debug(
f"Injected human input into group chat conversation: "
f"{trailing_msg.text[:50] if trailing_msg.text else '(empty)'}..."
)
self._pending_agent = None
if await self._complete_on_termination(ctx):
@@ -685,14 +708,18 @@ class GroupChatOrchestratorExecutor(BaseGroupChatOrchestrator):
to the selected participant. This method implements the core orchestration
logic for agent-based managers.
Also handles any human input that was injected into the response's full_conversation
by the human input hook interceptor.
Args:
response: AgentExecutor response from manager agent
ctx: Workflow context for routing and output
Behavior:
- Extracts any human input from the response
- Parses manager selection from response
- If finish=True: yields final message and completes workflow
- If participant selected: routes request to that participant
- If participant selected: routes request to that participant with human input included
- Validates selected participant exists
- Enforces round limits if configured
@@ -700,6 +727,9 @@ class GroupChatOrchestratorExecutor(BaseGroupChatOrchestrator):
ValueError: If manager selects invalid/unknown participant
RuntimeError: If manager response cannot be parsed
"""
# Extract any human input that was injected by the human input hook
trailing_user_messages = self._extract_trailing_user_messages(response)
selection = self._parse_manager_selection(response)
if self._pending_finalization:
@@ -753,6 +783,19 @@ class GroupChatOrchestratorExecutor(BaseGroupChatOrchestrator):
self._conversation.append(manager_message)
self._history.append(_GroupChatTurn(self._manager_name, "manager", manager_message))
# Inject any human input that was attached to the manager's response
# This ensures the next participant sees the human's guidance
if trailing_user_messages:
for human_msg in trailing_user_messages:
conversation.append(human_msg)
self._conversation.append(human_msg)
author = human_msg.author_name or "human"
self._history.append(_GroupChatTurn(author, "user", human_msg))
logger.debug(
f"Injected human input after manager selection: "
f"{human_msg.text[:50] if human_msg.text else '(empty)'}..."
)
if await self._complete_on_termination(ctx):
return
@@ -808,6 +851,41 @@ class GroupChatOrchestratorExecutor(BaseGroupChatOrchestrator):
)
return ensure_author(final_message, participant_name)
@staticmethod
def _extract_trailing_user_messages(response: AgentExecutorResponse) -> list[ChatMessage]:
"""Extract any user messages that appear after the last assistant message.
This is used to capture human input that was injected by the human input hook
interceptor. The hook adds user messages to full_conversation after the agent's
response, so they appear at the end of the sequence.
Args:
response: AgentExecutor response that may contain trailing user messages
Returns:
List of user messages that appear after the last assistant message,
or empty list if none found
"""
if not response.full_conversation:
return []
# Find index of last assistant message
last_assistant_idx = -1
for i, msg in enumerate(response.full_conversation):
if msg.role == Role.ASSISTANT:
last_assistant_idx = i
if last_assistant_idx < 0:
return []
# Collect any user messages after the last assistant message
trailing_user: list[ChatMessage] = []
for msg in response.full_conversation[last_assistant_idx + 1 :]:
if msg.role == Role.USER:
trailing_user.append(msg)
return trailing_user
async def _handle_task_message(
self,
task_message: ChatMessage,
@@ -979,6 +1057,9 @@ class GroupChatOrchestratorExecutor(BaseGroupChatOrchestrator):
Routes responses based on whether they come from the manager or a participant:
- Manager responses: parsed for speaker selection decisions
- Participant responses: ingested as conversation messages
Also handles any human input that was injected into the response's full_conversation
by the human input hook interceptor.
"""
participant_name = self._registry.get_participant_name(response.executor_id)
if participant_name is None:
@@ -994,7 +1075,13 @@ class GroupChatOrchestratorExecutor(BaseGroupChatOrchestrator):
else:
# Regular participant response
message = self._extract_agent_message(response, participant_name)
await self._ingest_participant_message(participant_name, message, ctx)
# Check for human input injected by human input hook
# Human input appears as user messages at the end of full_conversation
# after the agent's assistant message
trailing_user_messages = self._extract_trailing_user_messages(response)
await self._ingest_participant_message(participant_name, message, ctx, trailing_user_messages)
def _default_orchestrator_factory(wiring: _GroupChatConfig) -> Executor:
@@ -1089,13 +1176,14 @@ def assemble_group_chat_workflow(
manager_entry = manager_pipeline[0]
manager_exit = manager_pipeline[-1]
# Register manager with orchestrator
# Register manager with orchestrator (with entry and exit IDs for pipeline routing)
register_entry = getattr(orchestrator, "register_participant_entry", None)
if callable(register_entry):
register_entry(
wiring.manager_name,
entry_id=manager_entry.id,
is_agent=not isinstance(wiring.manager_participant, Executor),
exit_id=manager_exit.id if manager_exit is not manager_entry else None,
)
# Wire manager edges: Orchestrator ↔ Manager
@@ -1118,10 +1206,13 @@ def assemble_group_chat_workflow(
register_entry = getattr(orchestrator, "register_participant_entry", None)
if callable(register_entry):
# Register both entry and exit IDs so responses can be routed correctly
# when interceptors are prepended to the pipeline
register_entry(
name,
entry_id=entry_executor.id,
is_agent=not isinstance(spec.participant, Executor),
exit_id=exit_executor.id if exit_executor is not entry_executor else None,
)
workflow_builder = workflow_builder.add_edge(orchestrator, entry_executor)
@@ -1213,6 +1304,30 @@ class GroupChatBuilder:
.build()
)
*Pattern 3: Request info for mid-conversation feedback*
.. code-block:: python
from agent_framework import GroupChatBuilder
# Pause before all participants
workflow = (
GroupChatBuilder()
.set_select_speakers_func(select_next_speaker)
.participants([researcher, writer])
.with_request_info()
.build()
)
# Pause only before specific participants
workflow = (
GroupChatBuilder()
.set_select_speakers_func(select_next_speaker)
.participants([researcher, writer, editor])
.with_request_info(agents=[editor]) # Only pause before editor responds
.build()
)
**Participant Specification:**
Two ways to specify participants:
@@ -1262,6 +1377,8 @@ class GroupChatBuilder:
self._interceptors: list[_InterceptorSpec] = []
self._orchestrator_factory = group_chat_orchestrator(_orchestrator_factory)
self._participant_factory = _participant_factory or _default_participant_factory
self._request_info_enabled: bool = False
self._request_info_filter: set[str] | None = None
def _set_manager_function(
self,
@@ -1338,6 +1455,12 @@ class GroupChatBuilder:
Note:
The manager agent's response_format must be ManagerSelectionResponse for structured output.
Custom response formats raise ValueError instead of being overridden.
The manager can be included in :py:meth:`with_request_info` to pause before the manager
runs, allowing human steering of orchestration decisions. If no filter is specified,
the manager is included automatically. To filter explicitly::
.with_request_info(agents=[manager, writer]) # Pause before manager and writer
"""
if self._manager is not None or self._manager_participant is not None:
raise ValueError(
@@ -1668,6 +1791,54 @@ class GroupChatBuilder:
self._max_rounds = max_rounds
return self
def with_request_info(
self,
*,
agents: Sequence[str | AgentProtocol | Executor] | None = None,
) -> "GroupChatBuilder":
"""Enable request info before participants run in the workflow.
When enabled, the workflow pauses before each participant runs, emitting
a RequestInfoEvent that allows the caller to review the conversation and
optionally inject guidance before the participant responds. The caller provides
input via the standard response_handler/request_info pattern.
Args:
agents: Optional filter - only pause before these specific agents/executors.
Accepts agent names (str), agent instances, or executor instances.
If None (default), pauses before every participant.
Returns:
self: The builder instance for fluent chaining.
Example:
.. code-block:: python
# Pause before all participants
workflow = (
GroupChatBuilder()
.set_manager(manager)
.participants([optimist, pragmatist, creative])
.with_request_info()
.build()
)
# Pause only before specific participants
workflow = (
GroupChatBuilder()
.set_manager(manager)
.participants([optimist, pragmatist, creative])
.with_request_info(agents=[pragmatist]) # Only pause before pragmatist
.build()
)
"""
from ._orchestration_request_info import resolve_request_info_filter
self._request_info_enabled = True
self._request_info_filter = resolve_request_info_filter(list(agents) if agents else None)
return self
def _get_participant_metadata(self) -> dict[str, Any]:
if self._participant_metadata is None:
self._participant_metadata = prepare_participant_metadata(
@@ -1754,9 +1925,32 @@ class GroupChatBuilder:
participant_executors=metadata["executors"],
)
# Determine participant factory - wrap if request info is enabled
participant_factory = self._participant_factory
if self._request_info_enabled:
# Create a wrapper factory that adds request info interceptor before each participant
base_factory = participant_factory
agent_filter = self._request_info_filter
def _factory_with_request_info(
spec: GroupChatParticipantSpec,
config: _GroupChatConfig,
) -> _GroupChatParticipantPipeline:
pipeline = list(base_factory(spec, config))
if pipeline:
# Add interceptor executor BEFORE the participant (prepend)
interceptor = RequestInfoInterceptor(
executor_id=f"request_info:{spec.name}",
agent_filter=agent_filter,
)
pipeline.insert(0, interceptor)
return tuple(pipeline)
participant_factory = _factory_with_request_info
result = assemble_group_chat_workflow(
wiring=wiring,
participant_factory=self._participant_factory,
participant_factory=participant_factory,
orchestrator_factory=self._orchestrator_factory,
interceptors=self._interceptors,
checkpoint_storage=self._checkpoint_storage,
@@ -45,8 +45,10 @@ from ._executor import Executor, handler
from ._group_chat import (
_default_participant_factory, # type: ignore[reportPrivateUsage]
_GroupChatConfig, # type: ignore[reportPrivateUsage]
_GroupChatParticipantPipeline, # type: ignore[reportPrivateUsage]
assemble_group_chat_workflow,
)
from ._orchestration_request_info import RequestInfoInterceptor
from ._orchestrator_helpers import clean_conversation_for_handoff
from ._participant_utils import GroupChatParticipantSpec, prepare_participant_metadata, sanitize_identifier
from ._request_info_mixin import response_handler
@@ -315,6 +317,30 @@ class _HandoffCoordinator(BaseGroupChatOrchestrator):
"""Get the coordinator name for orchestrator-generated messages."""
return "handoff_coordinator"
def _extract_agent_id_from_source(self, source: str | None) -> str | None:
"""Extract the original agent ID from the source executor ID.
When a request info interceptor is in the pipeline, the source will be
like 'request_info:agent_name'. This method extracts the
actual agent ID.
Args:
source: The source executor ID from the workflow context
Returns:
The actual agent ID, or the original source if not an interceptor
"""
if source is None:
return None
if source.startswith("request_info:"):
return source[len("request_info:") :]
# TODO(@moonbox3): Remove legacy prefix support in a separate PR (GA cleanup)
if source.startswith("human_review:"):
return source[len("human_review:") :]
if source.startswith("human_input_interceptor:"):
return source[len("human_input_interceptor:") :]
return source
@handler
async def handle_agent_response(
self,
@@ -322,7 +348,8 @@ class _HandoffCoordinator(BaseGroupChatOrchestrator):
ctx: WorkflowContext[AgentExecutorRequest | list[ChatMessage], list[ChatMessage] | _ConversationForUserInput],
) -> None:
"""Process an agent's response and determine whether to route, request input, or terminate."""
source = ctx.get_source_executor_id()
raw_source = ctx.get_source_executor_id()
source = self._extract_agent_id_from_source(raw_source)
is_starting_agent = source == self._starting_agent_id
# On first turn of a run, conversation is empty
@@ -400,8 +427,8 @@ class _HandoffCoordinator(BaseGroupChatOrchestrator):
cleaned_for_display = clean_conversation_for_handoff(conversation)
# The awaiting_agent_id is the agent that just responded and is awaiting user input
# This is the source of the current response
next_agent_id = source
# This is the source of the current response (fallback to starting agent if source is unknown)
next_agent_id = source or self._starting_agent_id
message_to_gateway = _ConversationForUserInput(conversation=cleaned_for_display, next_agent_id=next_agent_id)
await ctx.send_message(message_to_gateway, target_id=self._input_gateway_id) # type: ignore[arg-type]
@@ -826,6 +853,8 @@ class HandoffBuilder:
self._return_to_previous: bool = False
self._interaction_mode: Literal["human_in_loop", "autonomous"] = "human_in_loop"
self._autonomous_turn_limit: int | None = _DEFAULT_AUTONOMOUS_TURN_LIMIT
self._request_info_enabled: bool = False
self._request_info_filter: set[str] | None = None
if participants:
self.participants(participants)
@@ -1418,6 +1447,52 @@ class HandoffBuilder:
self._return_to_previous = enabled
return self
def with_request_info(
self,
*,
agents: Sequence[str | AgentProtocol | Executor] | None = None,
) -> "HandoffBuilder":
"""Enable request info before participants run in the workflow.
When enabled, the workflow pauses before each participant runs, emitting
a RequestInfoEvent that allows the caller to review the conversation and
optionally inject guidance before the participant responds. The caller provides
input via the standard response_handler/request_info pattern.
Args:
agents: Optional filter - only pause before these specific agents/executors.
Accepts agent names (str), agent instances, or executor instances.
If None (default), pauses before every participant.
Returns:
self: The builder instance for fluent chaining.
Example:
.. code-block:: python
# Pause before all participants
workflow = (
HandoffBuilder(participants=[coordinator, refund, shipping])
.set_coordinator("coordinator_agent")
.with_request_info()
.build()
)
# Pause only before specialist agents (not coordinator)
workflow = (
HandoffBuilder(participants=[coordinator, refund, shipping])
.set_coordinator("coordinator_agent")
.with_request_info(agents=[refund, shipping])
.build()
)
"""
from ._orchestration_request_info import resolve_request_info_filter
self._request_info_enabled = True
self._request_info_filter = resolve_request_info_filter(list(agents) if agents else None)
return self
def build(self) -> Workflow:
"""Construct the final Workflow instance from the configured builder.
@@ -1562,9 +1637,33 @@ class HandoffBuilder:
participant_executors=self._executors,
)
# Determine participant factory - wrap with request info interceptor if enabled
participant_factory: Callable[[GroupChatParticipantSpec, _GroupChatConfig], _GroupChatParticipantPipeline] = (
_default_participant_factory
)
if self._request_info_enabled:
base_factory = _default_participant_factory
agent_filter = self._request_info_filter
def _factory_with_request_info(
spec: GroupChatParticipantSpec,
config: _GroupChatConfig,
) -> _GroupChatParticipantPipeline:
pipeline = list(base_factory(spec, config))
if pipeline:
# Add interceptor executor BEFORE the participant (prepend)
interceptor = RequestInfoInterceptor(
executor_id=f"request_info:{spec.name}",
agent_filter=agent_filter,
)
pipeline.insert(0, interceptor)
return tuple(pipeline)
participant_factory = _factory_with_request_info
result = assemble_group_chat_workflow(
wiring=wiring,
participant_factory=_default_participant_factory,
participant_factory=participant_factory,
orchestrator_factory=_handoff_orchestrator_factory,
interceptors=(),
checkpoint_storage=self._checkpoint_storage,
@@ -1575,7 +1674,18 @@ class HandoffBuilder:
raise TypeError("Expected tuple from assemble_group_chat_workflow with return_builder=True")
builder, coordinator = result
builder = builder.add_edge(input_node, starting_executor)
# When request_info is enabled, the input should go through the interceptor first
if self._request_info_enabled:
# Get the entry executor from the builder's registered executors
starting_entry_id = f"request_info:{self._starting_agent_id}"
starting_entry_executor = builder._executors.get(starting_entry_id) # type: ignore
if starting_entry_executor:
builder = builder.add_edge(input_node, starting_entry_executor)
else:
# Fallback to direct connection if interceptor not found
builder = builder.add_edge(input_node, starting_executor)
else:
builder = builder.add_edge(input_node, starting_executor)
builder = builder.add_edge(coordinator, user_gateway)
builder = builder.add_edge(user_gateway, coordinator)
@@ -2089,6 +2089,17 @@ class MagenticBuilder:
The builder provides a fluent API for configuring participants, the manager, optional
plan review, checkpointing, and event callbacks.
Human-in-the-loop Support:
Magentic provides specialized HITL mechanisms via:
- `.with_plan_review()` - Review and approve/revise plans before execution
- `.with_human_input_on_stall()` - Intervene when workflow stalls
- Tool approval via `FunctionApprovalRequestContent` - Approve individual tool calls
These emit `MagenticHumanInterventionRequest` events that provide structured
decision options (APPROVE, REVISE, CONTINUE, REPLAN, GUIDANCE) appropriate
for Magentic's planning-based orchestration.
Usage:
.. code-block:: python
@@ -0,0 +1,329 @@
# Copyright (c) Microsoft. All rights reserved.
"""Request info support for high-level builder APIs.
This module provides a mechanism for pausing workflows to request external input
before agent turns in `SequentialBuilder`, `ConcurrentBuilder`, `GroupChatBuilder`,
and `HandoffBuilder`.
The design follows the standard `request_info` pattern used throughout the
workflow system, keeping the API consistent and predictable.
Key components:
- AgentInputRequest: Request type emitted via RequestInfoEvent for pre-agent steering
- RequestInfoInterceptor: Internal executor that pauses workflow before agent runs
"""
import logging
import uuid
from dataclasses import dataclass, field
from typing import Any
from .._agents import AgentProtocol
from .._types import ChatMessage, Role
from ._agent_executor import AgentExecutorRequest
from ._executor import Executor, handler
from ._request_info_mixin import response_handler
from ._workflow_context import WorkflowContext
logger = logging.getLogger(__name__)
def resolve_request_info_filter(
agents: list[str | AgentProtocol | Executor] | None,
) -> set[str] | None:
"""Resolve a list of agent/executor references to a set of IDs for filtering.
Args:
agents: List of agent names (str), AgentProtocol instances, or Executor instances.
If None, returns None (meaning no filtering - pause for all).
Returns:
Set of executor/agent IDs to filter on, or None if no filtering.
"""
if agents is None:
return None
result: set[str] = set()
for agent in agents:
if isinstance(agent, str):
result.add(agent)
elif isinstance(agent, Executor):
result.add(agent.id)
elif isinstance(agent, AgentProtocol):
name = getattr(agent, "name", None)
if name:
result.add(name)
else:
logger.warning("AgentProtocol without name cannot be used for request_info filtering")
else:
logger.warning(f"Unsupported type for request_info filter: {type(agent).__name__}")
return result if result else None
@dataclass
class AgentInputRequest:
"""Request for human input before an agent runs in high-level builder workflows.
Emitted via RequestInfoEvent when a workflow pauses before an agent executes.
The response is injected into the conversation as a user message to steer
the agent's behavior.
This is the standard request type used by `.with_request_info()` on
SequentialBuilder, ConcurrentBuilder, GroupChatBuilder, and HandoffBuilder.
Attributes:
target_agent_id: ID of the agent that is about to run
conversation: Current conversation history the agent will receive
instruction: Optional instruction from the orchestrator (e.g., manager in GroupChat)
metadata: Builder-specific context (stores internal state for resume)
"""
target_agent_id: str | None
conversation: list[ChatMessage] = field(default_factory=lambda: [])
instruction: str | None = None
metadata: dict[str, Any] = field(default_factory=lambda: {})
# Keep legacy name as alias for backward compatibility
AgentResponseReviewRequest = AgentInputRequest
DEFAULT_REQUEST_INFO_ID = "request_info_interceptor"
class RequestInfoInterceptor(Executor):
"""Internal executor that pauses workflow for human input before agent runs.
This executor is inserted into the workflow graph by builders when
`.with_request_info()` is called. It intercepts AgentExecutorRequest messages
BEFORE the agent runs and pauses the workflow via `ctx.request_info()` with
an AgentInputRequest.
When a response is received, the response handler injects the input
as a user message into the conversation and forwards the request to the agent.
The optional `agent_filter` parameter allows limiting which agents trigger the pause.
If the target agent's ID is not in the filter set, the request is forwarded
without pausing.
"""
def __init__(
self,
executor_id: str | None = None,
agent_filter: set[str] | None = None,
) -> None:
"""Initialize the request info interceptor executor.
Args:
executor_id: ID for this executor. If None, generates a unique ID
using the format "request_info_interceptor-<uuid4>".
agent_filter: Optional set of agent/executor IDs to filter on.
If provided, only requests to these agents trigger a pause.
If None (default), all requests trigger a pause.
"""
if executor_id is None:
executor_id = f"{DEFAULT_REQUEST_INFO_ID}-{uuid.uuid4().hex[:8]}"
super().__init__(executor_id)
self._agent_filter = agent_filter
def _should_pause_for_agent(self, agent_id: str | None) -> bool:
"""Check if we should pause for the given agent ID."""
if self._agent_filter is None:
return True
if agent_id is None:
return False
# Check both the full ID and any name portion after a prefix
# e.g., "groupchat_agent:writer" should match filter "writer"
if agent_id in self._agent_filter:
return True
# Extract name from prefixed IDs like "groupchat_agent:writer" or "request_info:writer"
if ":" in agent_id:
name_part = agent_id.split(":", 1)[1]
if name_part in self._agent_filter:
return True
return False
def _extract_agent_name_from_executor_id(self) -> str | None:
"""Extract the agent name from this interceptor's executor ID.
The interceptor ID is typically "request_info:<agent_name>", so we
extract the agent name to determine which agent we're intercepting for.
"""
if ":" in self.id:
return self.id.split(":", 1)[1]
return None
@handler
async def intercept_agent_request(
self,
request: AgentExecutorRequest,
ctx: WorkflowContext[AgentExecutorRequest, Any],
) -> None:
"""Intercept request before agent runs and pause for human input.
Pauses the workflow and emits a RequestInfoEvent with the current
conversation for steering. If an agent filter is configured and this
agent is not in the filter, the request is forwarded without pausing.
Args:
request: The request about to be sent to the agent
ctx: Workflow context for requesting info
"""
# Determine the target agent from our executor ID
target_agent = self._extract_agent_name_from_executor_id()
# Check if we should pause for this agent
if not self._should_pause_for_agent(target_agent):
logger.debug(f"Skipping request_info pause for agent {target_agent} (not in filter)")
await ctx.send_message(request)
return
conversation = list(request.messages or [])
input_request = AgentInputRequest(
target_agent_id=target_agent,
conversation=conversation,
instruction=None, # Could be extended to include manager instruction
metadata={"_original_request": request, "_input_type": "AgentExecutorRequest"},
)
await ctx.request_info(input_request, str)
@handler
async def intercept_conversation(
self,
messages: list[ChatMessage],
ctx: WorkflowContext[list[ChatMessage], Any],
) -> None:
"""Intercept conversation before agent runs (used by SequentialBuilder).
SequentialBuilder passes list[ChatMessage] directly to agents. This handler
intercepts that flow and pauses for human input.
Args:
messages: The conversation about to be sent to the agent
ctx: Workflow context for requesting info
"""
# Determine the target agent from our executor ID
target_agent = self._extract_agent_name_from_executor_id()
# Check if we should pause for this agent
if not self._should_pause_for_agent(target_agent):
logger.debug(f"Skipping request_info pause for agent {target_agent} (not in filter)")
await ctx.send_message(messages)
return
input_request = AgentInputRequest(
target_agent_id=target_agent,
conversation=list(messages),
instruction=None,
metadata={"_original_messages": messages, "_input_type": "list[ChatMessage]"},
)
await ctx.request_info(input_request, str)
@handler
async def intercept_concurrent_requests(
self,
requests: list[AgentExecutorRequest],
ctx: WorkflowContext[list[AgentExecutorRequest], Any],
) -> None:
"""Intercept requests before concurrent agents run.
This handler is used by ConcurrentBuilder to get human input before
all parallel agents execute.
Args:
requests: List of requests for all concurrent agents
ctx: Workflow context for requesting info
"""
# Combine conversations for display
combined_conversation: list[ChatMessage] = []
if requests:
combined_conversation = list(requests[0].messages or [])
input_request = AgentInputRequest(
target_agent_id=None, # Multiple agents
conversation=combined_conversation,
instruction=None,
metadata={"_original_requests": requests},
)
await ctx.request_info(input_request, str)
@response_handler
async def handle_input_response(
self,
original_request: AgentInputRequest,
# TODO(@moonbox3): Extend to support other content types
response: str,
ctx: WorkflowContext[AgentExecutorRequest | list[ChatMessage], Any],
) -> None:
"""Handle the human input and forward the modified request to the agent.
Injects the response as a user message into the conversation
and forwards the modified request to the agent.
Args:
original_request: The AgentInputRequest that triggered the pause
response: The human input text
ctx: Workflow context for continuing the workflow
TODO: Consider having each orchestration implement its own response handler
for more specialized behavior.
"""
human_message = ChatMessage(role=Role.USER, text=response)
# Handle concurrent case (list of AgentExecutorRequest)
original_requests: list[AgentExecutorRequest] | None = original_request.metadata.get("_original_requests")
if original_requests is not None:
updated_requests: list[AgentExecutorRequest] = []
for orig_req in original_requests:
messages = list(orig_req.messages or [])
messages.append(human_message)
updated_requests.append(
AgentExecutorRequest(
messages=messages,
should_respond=orig_req.should_respond,
)
)
logger.debug(
f"Human input received for concurrent workflow, "
f"continuing with {len(updated_requests)} updated requests"
)
await ctx.send_message(updated_requests) # type: ignore[arg-type]
return
# Handle list[ChatMessage] case (SequentialBuilder)
original_messages: list[ChatMessage] | None = original_request.metadata.get("_original_messages")
if original_messages is not None:
messages = list(original_messages)
messages.append(human_message)
logger.debug(
f"Human input received for agent {original_request.target_agent_id}, "
f"forwarding conversation with steering context"
)
await ctx.send_message(messages)
return
# Handle AgentExecutorRequest case (GroupChatBuilder)
orig_request: AgentExecutorRequest | None = original_request.metadata.get("_original_request")
if orig_request is not None:
messages = list(orig_request.messages or [])
messages.append(human_message)
updated_request = AgentExecutorRequest(
messages=messages,
should_respond=orig_request.should_respond,
)
logger.debug(
f"Human input received for agent {original_request.target_agent_id}, "
f"forwarding request with steering context"
)
await ctx.send_message(updated_request)
return
logger.error("Input response handler missing original request/messages in metadata")
raise RuntimeError("Missing original request or messages in AgentInputRequest metadata")
@@ -140,6 +140,9 @@ class ParticipantRegistry:
Provides a clean interface for the common pattern of mapping participant names
to executor IDs and tracking which are agents vs custom executors.
Tracks both entry IDs (where to send requests) and exit IDs (where responses
come from) to support pipeline configurations where these differ.
"""
def __init__(self) -> None:
@@ -154,19 +157,26 @@ class ParticipantRegistry:
*,
entry_id: str,
is_agent: bool,
exit_id: str | None = None,
) -> None:
"""Register a participant's routing information.
Args:
name: Participant name
entry_id: Executor ID for this participant's entry point
entry_id: Executor ID for this participant's entry point (where to send)
is_agent: Whether this is an AgentExecutor (True) or custom Executor (False)
exit_id: Executor ID for this participant's exit point (where responses come from).
If None, defaults to entry_id (single-executor pipeline).
"""
self._participant_entry_ids[name] = entry_id
actual_exit_id = exit_id if exit_id is not None else entry_id
if is_agent:
self._agent_executor_ids[name] = entry_id
# Map both entry and exit IDs to participant name for response routing
self._executor_id_to_participant[entry_id] = name
if actual_exit_id != entry_id:
self._executor_id_to_participant[actual_exit_id] = name
else:
self._non_agent_participants.add(name)
@@ -52,6 +52,7 @@ from ._executor import (
handler,
)
from ._message_utils import normalize_messages_input
from ._orchestration_request_info import RequestInfoInterceptor
from ._workflow import Workflow
from ._workflow_builder import WorkflowBuilder
from ._workflow_context import WorkflowContext
@@ -76,9 +77,7 @@ class _InputToConversation(Executor):
messages: list[str | ChatMessage],
ctx: WorkflowContext[list[ChatMessage]],
) -> None:
# Make a copy to avoid mutation downstream
normalized = normalize_messages_input(messages)
await ctx.send_message(list(normalized))
await ctx.send_message(normalize_messages_input(messages))
class _ResponseToConversation(Executor):
@@ -119,11 +118,24 @@ class SequentialBuilder:
# Enable checkpoint persistence
workflow = SequentialBuilder().participants([agent1, agent2]).with_checkpointing(storage).build()
# Enable request info for mid-workflow feedback (pauses before each agent)
workflow = SequentialBuilder().participants([agent1, agent2]).with_request_info().build()
# Enable request info only for specific agents
workflow = (
SequentialBuilder()
.participants([agent1, agent2, agent3])
.with_request_info(agents=[agent2]) # Only pause before agent2
.build()
)
"""
def __init__(self) -> None:
self._participants: list[AgentProtocol | Executor] = []
self._checkpoint_storage: CheckpointStorage | None = None
self._request_info_enabled: bool = False
self._request_info_filter: set[str] | None = None
def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "SequentialBuilder":
"""Define the ordered participants for this sequential workflow.
@@ -157,14 +169,56 @@ class SequentialBuilder:
self._checkpoint_storage = checkpoint_storage
return self
def with_request_info(
self,
*,
agents: Sequence[str | AgentProtocol | Executor] | None = None,
) -> "SequentialBuilder":
"""Enable request info before agents run in the workflow.
When enabled, the workflow pauses before each agent runs, emitting
a RequestInfoEvent that allows the caller to review the conversation and
optionally inject guidance before the agent responds. The caller provides
input via the standard response_handler/request_info pattern.
Args:
agents: Optional filter - only pause before these specific agents/executors.
Accepts agent names (str), agent instances, or executor instances.
If None (default), pauses before every agent.
Returns:
self: The builder instance for fluent chaining.
Example:
.. code-block:: python
# Pause before all agents
workflow = SequentialBuilder().participants([a1, a2]).with_request_info().build()
# Pause only before specific agents
workflow = (
SequentialBuilder()
.participants([drafter, reviewer, finalizer])
.with_request_info(agents=[reviewer]) # Only pause before reviewer
.build()
)
"""
from ._orchestration_request_info import resolve_request_info_filter
self._request_info_enabled = True
self._request_info_filter = resolve_request_info_filter(list(agents) if agents else None)
return self
def build(self) -> Workflow:
"""Build and validate the sequential workflow.
Wiring pattern:
- _InputToConversation normalizes the initial input into list[ChatMessage]
- For each participant in order:
- If Agent (or AgentExecutor): pass conversation to the agent, then convert response
to conversation via _ResponseToConversation
- If Agent (or AgentExecutor): pass conversation to the agent, then optionally
route through human input interceptor, then convert response to conversation
via _ResponseToConversation
- Else (custom Executor): pass conversation directly to the executor
- _EndWithConversation yields the final conversation and the workflow becomes idle
"""
@@ -184,12 +238,22 @@ class SequentialBuilder:
for p in self._participants:
# Agent-like branch: either explicitly an AgentExecutor or any non-AgentExecutor
if not (isinstance(p, Executor) and not isinstance(p, AgentExecutor)):
# input conversation -> (agent) -> response -> conversation
builder.add_edge(prior, p)
# Give the adapter a deterministic, self-describing id
# input conversation -> [human_input_interceptor] -> (agent) -> response -> conversation
label: str
label = p.id if isinstance(p, Executor) else getattr(p, "name", None) or p.__class__.__name__
resp_to_conv = _ResponseToConversation(id=f"to-conversation:{label}")
if self._request_info_enabled:
# Insert request info interceptor BEFORE the agent
interceptor = RequestInfoInterceptor(
executor_id=f"request_info:{label}",
agent_filter=self._request_info_filter,
)
builder.add_edge(prior, interceptor)
builder.add_edge(interceptor, p)
else:
builder.add_edge(prior, p)
builder.add_edge(p, resp_to_conv)
prior = resp_to_conv
elif isinstance(p, Executor):
@@ -521,9 +521,9 @@ class ObservabilitySettings(AFBaseSettings):
logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter))
should_add_console_exporter = False
if should_add_console_exporter:
from opentelemetry.sdk._logs.export import ConsoleLogExporter
from opentelemetry.sdk._logs.export import ConsoleLogRecordExporter
logger_provider.add_log_record_processor(BatchLogRecordProcessor(ConsoleLogExporter()))
logger_provider.add_log_record_processor(BatchLogRecordProcessor(ConsoleLogRecordExporter()))
# Attach a handler with the provider to the root logger
logger = logging.getLogger()
@@ -1082,3 +1082,106 @@ def test_set_manager_builds_with_agent_manager() -> None:
assert isinstance(orchestrator, GroupChatOrchestratorExecutor)
assert orchestrator._is_manager_agent()
async def test_group_chat_with_request_info_filtering():
"""Test that with_request_info(agents=[...]) only pauses before specified agents run."""
from agent_framework import AgentInputRequest, RequestInfoEvent
# Create agents - we want to verify only beta triggers pause
alpha = StubAgent("alpha", "response from alpha")
beta = StubAgent("beta", "response from beta")
# Manager that selects alpha first, then beta, then finishes
call_count = 0
async def selector(state: GroupChatStateSnapshot) -> str | None:
nonlocal call_count
call_count += 1
if call_count == 1:
return "alpha"
if call_count == 2:
return "beta"
return None
workflow = (
GroupChatBuilder()
.set_select_speakers_func(selector, display_name="manager", final_message="done")
.participants(alpha=alpha, beta=beta)
.with_request_info(agents=["beta"]) # Only pause before beta runs
.build()
)
# Run until we get a request info event (should be before beta, not alpha)
request_events: list[RequestInfoEvent] = []
async for event in workflow.run_stream("test task"):
if isinstance(event, RequestInfoEvent) and isinstance(event.data, AgentInputRequest):
request_events.append(event)
# Don't break - let stream complete naturally when paused
# Should have exactly one request event before beta
assert len(request_events) == 1
request_event = request_events[0]
# The target agent should be beta's executor ID (groupchat_agent:beta)
assert request_event.data.target_agent_id is not None
assert "beta" in request_event.data.target_agent_id
# Continue the workflow with a response
outputs: list[WorkflowOutputEvent] = []
async for event in workflow.send_responses_streaming({request_event.request_id: "continue please"}):
if isinstance(event, WorkflowOutputEvent):
outputs.append(event)
# Workflow should complete
assert len(outputs) == 1
async def test_group_chat_with_request_info_no_filter_pauses_all():
"""Test that with_request_info() without agents pauses before all participants."""
from agent_framework import AgentInputRequest, RequestInfoEvent
# Create agents
alpha = StubAgent("alpha", "response from alpha")
# Manager selects alpha then finishes
call_count = 0
async def selector(state: GroupChatStateSnapshot) -> str | None:
nonlocal call_count
call_count += 1
if call_count == 1:
return "alpha"
return None
workflow = (
GroupChatBuilder()
.set_select_speakers_func(selector, display_name="manager", final_message="done")
.participants(alpha=alpha)
.with_request_info() # No filter - pause for all
.build()
)
# Run until we get a request info event
request_events: list[RequestInfoEvent] = []
async for event in workflow.run_stream("test task"):
if isinstance(event, RequestInfoEvent) and isinstance(event.data, AgentInputRequest):
request_events.append(event)
break
# Should pause before alpha
assert len(request_events) == 1
assert request_events[0].data.target_agent_id is not None
assert "alpha" in request_events[0].data.target_agent_id
def test_group_chat_builder_with_request_info_returns_self():
"""Test that with_request_info() returns self for method chaining."""
builder = GroupChatBuilder()
result = builder.with_request_info()
assert result is builder
# Also test with agents parameter
builder2 = GroupChatBuilder()
result2 = builder2.with_request_info(agents=["test"])
assert result2 is builder2
@@ -687,6 +687,54 @@ async def test_tool_choice_preserved_from_agent_config():
assert str(last_tool_choice) == "required", f"Expected 'required', got {last_tool_choice}"
async def test_handoff_builder_with_request_info():
"""Test that HandoffBuilder supports request info via with_request_info()."""
from agent_framework import AgentInputRequest, RequestInfoEvent
# Create test agents
coordinator = _RecordingAgent(name="coordinator")
specialist = _RecordingAgent(name="specialist")
# Build workflow with request info enabled
workflow = (
HandoffBuilder(participants=[coordinator, specialist])
.set_coordinator("coordinator")
.with_termination_condition(lambda conv: len([m for m in conv if m.role == Role.USER]) >= 1)
.with_request_info()
.build()
)
# Run workflow until it pauses for request info
request_event: RequestInfoEvent | None = None
async for event in workflow.run_stream("Hello"):
if isinstance(event, RequestInfoEvent) and isinstance(event.data, AgentInputRequest):
request_event = event
# Verify request info was emitted
assert request_event is not None, "Request info should have been emitted"
assert isinstance(request_event.data, AgentInputRequest)
# Provide response and continue
output_events: list[WorkflowOutputEvent] = []
async for event in workflow.send_responses_streaming({request_event.request_id: "approved"}):
if isinstance(event, WorkflowOutputEvent):
output_events.append(event)
# Verify we got output events
assert len(output_events) > 0, "Should produce output events after response"
async def test_handoff_builder_with_request_info_method_chaining():
"""Test that with_request_info returns self for method chaining."""
coordinator = _RecordingAgent(name="coordinator")
builder = HandoffBuilder(participants=[coordinator])
result = builder.with_request_info()
assert result is builder, "with_request_info should return self for chaining"
assert builder._request_info_enabled is True # type: ignore
async def test_return_to_previous_state_serialization():
"""Test that return_to_previous state is properly serialized/deserialized for checkpointing."""
from agent_framework._workflows._handoff import _HandoffCoordinator # type: ignore[reportPrivateUsage]
@@ -857,3 +857,22 @@ async def test_magentic_checkpoint_runtime_overrides_buildtime() -> None:
assert len(runtime_checkpoints) > 0, "Runtime storage should have checkpoints"
assert len(buildtime_checkpoints) == 0, "Build-time storage should have no checkpoints when overridden"
def test_magentic_builder_does_not_have_human_input_hook():
"""Test that MagenticBuilder does not expose with_human_input_hook (uses specialized HITL instead).
Magentic uses specialized human intervention mechanisms:
- with_plan_review() for plan approval
- with_human_input_on_stall() for stall intervention
- Tool approval via FunctionApprovalRequestContent
These emit MagenticHumanInterventionRequest events with structured decision options.
"""
builder = MagenticBuilder()
# MagenticBuilder should NOT have the generic human input hook mixin
assert not hasattr(builder, "with_human_input_hook"), (
"MagenticBuilder should not have with_human_input_hook - "
"use with_plan_review() or with_human_input_on_stall() instead"
)
@@ -0,0 +1,168 @@
# Copyright (c) Microsoft. All rights reserved.
"""Unit tests for request info support in high-level builders."""
from typing import Any
from unittest.mock import MagicMock
from agent_framework import (
AgentInputRequest,
AgentProtocol,
AgentResponseReviewRequest,
ChatMessage,
RequestInfoInterceptor,
Role,
)
from agent_framework._workflows._executor import Executor, handler
from agent_framework._workflows._orchestration_request_info import resolve_request_info_filter
from agent_framework._workflows._workflow_context import WorkflowContext
class DummyExecutor(Executor):
"""Dummy executor with a handler for testing."""
@handler
async def handle(self, data: str, ctx: WorkflowContext[Any, Any]) -> None:
pass
class TestResolveRequestInfoFilter:
"""Tests for resolve_request_info_filter function."""
def test_returns_none_for_none_input(self):
"""Test that None input returns None (no filtering)."""
result = resolve_request_info_filter(None)
assert result is None
def test_returns_none_for_empty_list(self):
"""Test that empty list returns None."""
result = resolve_request_info_filter([])
assert result is None
def test_resolves_string_names(self):
"""Test resolving string agent names."""
result = resolve_request_info_filter(["agent1", "agent2"])
assert result == {"agent1", "agent2"}
def test_resolves_executor_ids(self):
"""Test resolving Executor instances by ID."""
exec1 = DummyExecutor(id="executor1")
exec2 = DummyExecutor(id="executor2")
result = resolve_request_info_filter([exec1, exec2])
assert result == {"executor1", "executor2"}
def test_resolves_agent_names(self):
"""Test resolving AgentProtocol-like objects by name attribute."""
agent1 = MagicMock(spec=AgentProtocol)
agent1.name = "writer"
agent2 = MagicMock(spec=AgentProtocol)
agent2.name = "reviewer"
result = resolve_request_info_filter([agent1, agent2])
assert result == {"writer", "reviewer"}
def test_mixed_types(self):
"""Test resolving a mix of strings, agents, and executors."""
agent = MagicMock(spec=AgentProtocol)
agent.name = "writer"
executor = DummyExecutor(id="custom_exec")
result = resolve_request_info_filter(["manual_name", agent, executor])
assert result == {"manual_name", "writer", "custom_exec"}
def test_skips_agent_without_name(self):
"""Test that agents without names are skipped."""
agent_with_name = MagicMock(spec=AgentProtocol)
agent_with_name.name = "valid"
agent_without_name = MagicMock(spec=AgentProtocol)
agent_without_name.name = None
result = resolve_request_info_filter([agent_with_name, agent_without_name])
assert result == {"valid"}
class TestAgentInputRequest:
"""Tests for AgentInputRequest dataclass (formerly AgentResponseReviewRequest)."""
def test_create_request(self):
"""Test creating an AgentInputRequest with all fields."""
conversation = [ChatMessage(role=Role.USER, text="Hello")]
request = AgentInputRequest(
target_agent_id="test_agent",
conversation=conversation,
instruction="Review this",
metadata={"key": "value"},
)
assert request.target_agent_id == "test_agent"
assert request.conversation == conversation
assert request.instruction == "Review this"
assert request.metadata == {"key": "value"}
def test_create_request_defaults(self):
"""Test creating an AgentInputRequest with default values."""
request = AgentInputRequest(target_agent_id="test_agent")
assert request.target_agent_id == "test_agent"
assert request.conversation == []
assert request.instruction is None
assert request.metadata == {}
def test_backward_compatibility_alias(self):
"""Test that AgentResponseReviewRequest is an alias for AgentInputRequest."""
assert AgentResponseReviewRequest is AgentInputRequest
class TestRequestInfoInterceptor:
"""Tests for RequestInfoInterceptor executor."""
def test_interceptor_creation_generates_unique_id(self):
"""Test creating a RequestInfoInterceptor generates unique IDs."""
interceptor1 = RequestInfoInterceptor()
interceptor2 = RequestInfoInterceptor()
assert interceptor1.id.startswith("request_info_interceptor-")
assert interceptor2.id.startswith("request_info_interceptor-")
assert interceptor1.id != interceptor2.id
def test_interceptor_with_custom_id(self):
"""Test creating a RequestInfoInterceptor with custom ID."""
interceptor = RequestInfoInterceptor(executor_id="custom_review")
assert interceptor.id == "custom_review"
def test_interceptor_with_agent_filter(self):
"""Test creating a RequestInfoInterceptor with agent filter."""
agent_filter = {"agent1", "agent2"}
interceptor = RequestInfoInterceptor(
executor_id="filtered_review",
agent_filter=agent_filter,
)
assert interceptor.id == "filtered_review"
assert interceptor._agent_filter == agent_filter
def test_should_pause_for_agent_no_filter(self):
"""Test that interceptor pauses for all agents when no filter is set."""
interceptor = RequestInfoInterceptor()
assert interceptor._should_pause_for_agent("any_agent") is True
assert interceptor._should_pause_for_agent("another_agent") is True
assert interceptor._should_pause_for_agent(None) is True
def test_should_pause_for_agent_with_filter(self):
"""Test that interceptor only pauses for agents in the filter."""
agent_filter = {"writer", "reviewer"}
interceptor = RequestInfoInterceptor(agent_filter=agent_filter)
assert interceptor._should_pause_for_agent("writer") is True
assert interceptor._should_pause_for_agent("reviewer") is True
assert interceptor._should_pause_for_agent("drafter") is False
assert interceptor._should_pause_for_agent(None) is False
def test_should_pause_for_agent_with_prefixed_id(self):
"""Test that filter matches agent names in prefixed executor IDs."""
agent_filter = {"writer"}
interceptor = RequestInfoInterceptor(agent_filter=agent_filter)
# Should match the name portion after the colon
assert interceptor._should_pause_for_agent("groupchat_agent:writer") is True
assert interceptor._should_pause_for_agent("request_info:writer") is True
assert interceptor._should_pause_for_agent("groupchat_agent:editor") is False
@@ -111,7 +111,8 @@ def test_add_agent_with_custom_parameters():
builder = WorkflowBuilder()
# Add agent with custom parameters
result = builder.add_agent(agent, output_response=True, id="my_custom_id")
with pytest.deprecated_call():
result = builder.add_agent(agent, output_response=True, id="my_custom_id")
# Verify that add_agent returns the builder for chaining
assert result is builder
@@ -133,7 +134,8 @@ def test_add_agent_reuses_same_wrapper():
builder = WorkflowBuilder()
# Add agent with specific parameters
builder.add_agent(agent, output_response=True, id="agent_exec")
with pytest.deprecated_call():
builder.add_agent(agent, output_response=True, id="agent_exec")
# Use the same agent instance in add_edge - should reuse the same wrapper
builder.set_start_executor(agent)
@@ -158,8 +160,9 @@ def test_add_agent_then_use_in_edges():
builder = WorkflowBuilder()
# Add agents with specific settings
builder.add_agent(agent1, output_response=False, id="exec1")
builder.add_agent(agent2, output_response=True, id="exec2")
with pytest.deprecated_call():
builder.add_agent(agent1, output_response=False, id="exec1")
builder.add_agent(agent2, output_response=True, id="exec2")
# Use the same agent instances to create edges
workflow = builder.set_start_executor(agent1).add_edge(agent1, agent2).build()
@@ -183,7 +186,8 @@ def test_add_agent_without_explicit_id_uses_agent_name():
agent = DummyAgent(id="agent_x", name="named_agent")
builder = WorkflowBuilder()
result = builder.add_agent(agent)
with pytest.deprecated_call():
result = builder.add_agent(agent)
# Verify that add_agent returns the builder for chaining
assert result is builder
@@ -203,10 +207,11 @@ def test_add_agent_duplicate_id_raises_error():
builder = WorkflowBuilder()
# Add first agent
builder.add_agent(agent1)
with pytest.deprecated_call():
builder.add_agent(agent1)
# Adding second agent with same name should raise ValueError
with pytest.raises(ValueError, match="Duplicate executor ID"):
with pytest.deprecated_call(), pytest.raises(ValueError, match="Duplicate executor ID"):
builder.add_agent(agent2)
+1 -1
View File
@@ -33,7 +33,7 @@ Try to over-document the samples. This includes comments in the code, README.md
For the getting started samples and the concept samples, we should have the following:
1. A README.md file is included in each set of samples that explains the purpose of the samples and the setup required to run them.
2. A summary should be included at the top of the file that explains the purpose of the sample and required components/concepts to understand the sample. For example:
2. A summary should be included underneath the imports that explains the purpose of the sample and required components/concepts to understand the sample. For example:
```python
'''
@@ -78,9 +78,22 @@ Once comfortable with these, explore the rest of the samples below.
| Sample | File | Concepts |
|---|---|---|
| Human-In-The-Loop (Guessing Game) | [human-in-the-loop/guessing_game_with_human_input.py](./human-in-the-loop/guessing_game_with_human_input.py) | Interactive request/response prompts with a human |
| Azure Agents Tool Feedback Loop | [agents/azure_chat_agents_tool_calls_with_feedback.py](./agents/azure_chat_agents_tool_calls_with_feedback.py) | Two-agent workflow that streams tool calls and pauses for human guidance between passes |
| Human-In-The-Loop (Guessing Game) | [human-in-the-loop/guessing_game_with_human_input.py](./human-in-the-loop/guessing_game_with_human_input.py) | Interactive request/response prompts with a human via `ctx.request_info()` |
| Agents with Approval Requests in Workflows | [human-in-the-loop/agents_with_approval_requests.py](./human-in-the-loop/agents_with_approval_requests.py) | Agents that create approval requests during workflow execution and wait for human approval to proceed |
| SequentialBuilder Request Info | [human-in-the-loop/sequential_request_info.py](./human-in-the-loop/sequential_request_info.py) | Request info for agent responses mid-workflow using `.with_request_info()` on SequentialBuilder |
| ConcurrentBuilder Request Info | [human-in-the-loop/concurrent_request_info.py](./human-in-the-loop/concurrent_request_info.py) | Review concurrent agent outputs before aggregation using `.with_request_info()` on ConcurrentBuilder |
| GroupChatBuilder Request Info | [human-in-the-loop/group_chat_request_info.py](./human-in-the-loop/group_chat_request_info.py) | Steer group discussions with periodic guidance using `.with_request_info()` on GroupChatBuilder |
### tool-approval
Tool approval samples demonstrate using `@ai_function(approval_mode="always_require")` to gate sensitive tool executions with human approval. These work with the high-level builder APIs.
| Sample | File | Concepts |
|---|---|---|
| SequentialBuilder Tool Approval | [tool-approval/sequential_builder_tool_approval.py](./tool-approval/sequential_builder_tool_approval.py) | Sequential workflow with tool approval gates for sensitive operations |
| ConcurrentBuilder Tool Approval | [tool-approval/concurrent_builder_tool_approval.py](./tool-approval/concurrent_builder_tool_approval.py) | Concurrent workflow with tool approvals across parallel agents |
| GroupChatBuilder Tool Approval | [tool-approval/group_chat_builder_tool_approval.py](./tool-approval/group_chat_builder_tool_approval.py) | Group chat workflow with tool approval for multi-agent collaboration |
### observability
@@ -0,0 +1,198 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Sample: Request Info with ConcurrentBuilder
This sample demonstrates using the `.with_request_info()` method to pause a
ConcurrentBuilder workflow AFTER all parallel agents complete but BEFORE
aggregation, allowing human review and modification of the combined results.
Purpose:
Show how to use the request info API that pauses after concurrent agents run,
allowing review and steering of results before they are aggregated.
Demonstrate:
- Configuring request info with `.with_request_info()`
- Reviewing outputs from multiple concurrent agents
- Injecting human guidance after agents execute but before aggregation
Prerequisites:
- Azure OpenAI configured for AzureOpenAIChatClient with required environment variables
- Authentication via azure-identity (run az login before executing)
"""
import asyncio
from typing import Any
from agent_framework import (
AgentInputRequest,
ChatMessage,
ConcurrentBuilder,
RequestInfoEvent,
Role,
WorkflowOutputEvent,
WorkflowRunState,
WorkflowStatusEvent,
)
from agent_framework._workflows._agent_executor import AgentExecutorResponse
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
# Store chat client at module level for aggregator access
_chat_client: AzureOpenAIChatClient | None = None
async def aggregate_with_synthesis(results: list[AgentExecutorResponse]) -> Any:
"""Custom aggregator that synthesizes concurrent agent outputs using an LLM.
This aggregator extracts the outputs from each parallel agent and uses the
chat client to create a unified summary, incorporating any human feedback
that was injected into the conversation.
Args:
results: List of responses from all concurrent agents
Returns:
The synthesized summary text
"""
if not _chat_client:
return "Error: Chat client not initialized"
# Extract each agent's final output
expert_sections: list[str] = []
human_guidance = ""
for r in results:
try:
messages = getattr(r.agent_run_response, "messages", [])
final_text = messages[-1].text if messages and hasattr(messages[-1], "text") else "(no content)"
expert_sections.append(f"{getattr(r, 'executor_id', 'analyst')}:\n{final_text}")
# Check for human feedback in the conversation (will be last user message if present)
if r.full_conversation:
for msg in reversed(r.full_conversation):
if msg.role == Role.USER and msg.text and "perspectives" not in msg.text.lower():
human_guidance = msg.text
break
except Exception:
expert_sections.append(f"{getattr(r, 'executor_id', 'analyst')}: (error extracting output)")
# Build prompt with human guidance if provided
guidance_text = f"\n\nHuman guidance: {human_guidance}" if human_guidance else ""
system_msg = ChatMessage(
Role.SYSTEM,
text=(
"You are a synthesis expert. Consolidate the following analyst perspectives "
"into one cohesive, balanced summary (3-4 sentences). If human guidance is provided, "
"prioritize aspects as directed."
),
)
user_msg = ChatMessage(Role.USER, text="\n\n".join(expert_sections) + guidance_text)
response = await _chat_client.get_response([system_msg, user_msg])
return response.messages[-1].text if response.messages else ""
async def main() -> None:
global _chat_client
_chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Create agents that analyze from different perspectives
technical_analyst = _chat_client.create_agent(
name="technical_analyst",
instructions=(
"You are a technical analyst. When given a topic, provide a technical "
"perspective focusing on implementation details, performance, and architecture. "
"Keep your analysis to 2-3 sentences."
),
)
business_analyst = _chat_client.create_agent(
name="business_analyst",
instructions=(
"You are a business analyst. When given a topic, provide a business "
"perspective focusing on ROI, market impact, and strategic value. "
"Keep your analysis to 2-3 sentences."
),
)
user_experience_analyst = _chat_client.create_agent(
name="ux_analyst",
instructions=(
"You are a UX analyst. When given a topic, provide a user experience "
"perspective focusing on usability, accessibility, and user satisfaction. "
"Keep your analysis to 2-3 sentences."
),
)
# Build workflow with request info enabled and custom aggregator
workflow = (
ConcurrentBuilder()
.participants([technical_analyst, business_analyst, user_experience_analyst])
.with_aggregator(aggregate_with_synthesis)
.with_request_info()
.build()
)
# Run the workflow with human-in-the-loop
pending_responses: dict[str, str] | None = None
workflow_complete = False
print("Starting multi-perspective analysis workflow...")
print("=" * 60)
while not workflow_complete:
# Run or continue the workflow
stream = (
workflow.send_responses_streaming(pending_responses)
if pending_responses
else workflow.run_stream("Analyze the impact of large language models on software development.")
)
pending_responses = None
# Process events
async for event in stream:
if isinstance(event, RequestInfoEvent):
if isinstance(event.data, AgentInputRequest):
# Display pre-execution context for steering concurrent agents
print("\n" + "-" * 40)
print("INPUT REQUESTED (BEFORE CONCURRENT AGENTS)")
print("-" * 40)
print(f"About to call agents: {event.data.target_agent_id}")
print("Conversation context:")
recent = (
event.data.conversation[-2:] if len(event.data.conversation) > 2 else event.data.conversation
)
for msg in recent:
role = msg.role.value if msg.role else "unknown"
text = (msg.text or "")[:150]
print(f" [{role}]: {text}...")
print("-" * 40)
# Get human input to steer all agents
user_input = input("Your guidance for the analysts (or 'skip' to continue): ") # noqa: ASYNC250
if user_input.lower() == "skip":
user_input = "Please analyze objectively from your unique perspective."
pending_responses = {event.request_id: user_input}
print("(Resuming workflow...)")
elif isinstance(event, WorkflowOutputEvent):
print("\n" + "=" * 60)
print("WORKFLOW COMPLETE")
print("=" * 60)
print("Aggregated output:")
# Custom aggregator returns a string
if event.data:
print(event.data)
workflow_complete = True
elif isinstance(event, WorkflowStatusEvent):
if event.state == WorkflowRunState.IDLE:
workflow_complete = True
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,175 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Sample: Request Info with GroupChatBuilder
This sample demonstrates using the `.with_request_info()` method to pause a
GroupChatBuilder workflow BEFORE specific participants speak. By using the
`agents=` filter parameter, you can target only certain participants rather
than pausing before every turn.
Purpose:
Show how to use the request info API with selective filtering to pause before
specific participants speak, allowing human input to steer their response.
Demonstrate:
- Configuring request info with `.with_request_info(agents=[...])`
- Using agent filtering to reduce interruptions
- Steering agent behavior with pre-agent human input
Prerequisites:
- Azure OpenAI configured for AzureOpenAIChatClient with required environment variables
- Authentication via azure-identity (run az login before executing)
"""
import asyncio
from agent_framework import (
AgentInputRequest,
AgentRunUpdateEvent,
ChatMessage,
GroupChatBuilder,
RequestInfoEvent,
WorkflowOutputEvent,
WorkflowRunState,
WorkflowStatusEvent,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
async def main() -> None:
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Create agents for a group discussion
optimist = chat_client.create_agent(
name="optimist",
instructions=(
"You are an optimistic team member. You see opportunities and potential "
"in ideas. Engage constructively with the discussion, building on others' "
"points while maintaining a positive outlook. Keep responses to 2-3 sentences."
),
)
pragmatist = chat_client.create_agent(
name="pragmatist",
instructions=(
"You are a pragmatic team member. You focus on practical implementation "
"and realistic timelines. Sometimes you disagree with overly optimistic views. "
"Keep responses to 2-3 sentences."
),
)
creative = chat_client.create_agent(
name="creative",
instructions=(
"You are a creative team member. You propose innovative solutions and "
"think outside the box. You may suggest alternatives to conventional approaches. "
"Keep responses to 2-3 sentences."
),
)
# Manager orchestrates the discussion
manager = chat_client.create_agent(
name="manager",
instructions=(
"You are a discussion manager coordinating a team conversation between optimist, "
"pragmatist, and creative. Your job is to select who speaks next.\n\n"
"RULES:\n"
"1. Rotate through ALL participants - do not favor any single participant\n"
"2. Each participant should speak at least once before any participant speaks twice\n"
"3. If human feedback redirects the topic, acknowledge it and continue rotating\n"
"4. Continue for at least 5 participant turns before concluding\n"
"5. Do NOT select the same participant twice in a row"
),
)
# Build workflow with request info enabled
# Using agents= filter to only pause before pragmatist speaks (not every turn)
workflow = (
GroupChatBuilder()
.set_manager(manager=manager, display_name="Discussion Manager")
.participants([optimist, pragmatist, creative])
.with_max_rounds(6)
.with_request_info(agents=[pragmatist]) # Only pause before pragmatist speaks
.build()
)
# Run the workflow with human-in-the-loop
pending_responses: dict[str, str] | None = None
workflow_complete = False
current_agent: str | None = None # Track current streaming agent
print("Starting group discussion workflow...")
print("=" * 60)
while not workflow_complete:
# Run or continue the workflow
stream = (
workflow.send_responses_streaming(pending_responses)
if pending_responses
else workflow.run_stream(
"Discuss how our team should approach adopting AI tools for productivity. "
"Consider benefits, risks, and implementation strategies."
)
)
pending_responses = None
# Process events
async for event in stream:
if isinstance(event, AgentRunUpdateEvent):
# Show all agent responses as they stream
if event.data and event.data.text:
agent_name = event.data.author_name or "unknown"
# Print agent name header only when agent changes
if agent_name != current_agent:
current_agent = agent_name
print(f"\n[{agent_name}]: ", end="", flush=True)
print(event.data.text, end="", flush=True)
elif isinstance(event, RequestInfoEvent):
current_agent = None # Reset for next agent
if isinstance(event.data, AgentInputRequest):
# Display pre-agent context for human input
print("\n" + "-" * 40)
print("INPUT REQUESTED")
print(f"About to call agent: {event.data.target_agent_id}")
print("-" * 40)
print("Conversation context:")
recent = (
event.data.conversation[-3:] if len(event.data.conversation) > 3 else event.data.conversation
)
for msg in recent:
role = msg.role.value if msg.role else "unknown"
text = (msg.text or "")[:100]
print(f" [{role}]: {text}...")
print("-" * 40)
# Get human input to steer the agent
user_input = input("Steer the discussion (or 'skip' to continue): ") # noqa: ASYNC250
if user_input.lower() == "skip":
user_input = "Please continue the discussion naturally."
pending_responses = {event.request_id: user_input}
print("(Resuming discussion...)")
elif isinstance(event, WorkflowOutputEvent):
print("\n" + "=" * 60)
print("DISCUSSION COMPLETE")
print("=" * 60)
print("Final conversation:")
if event.data:
messages: list[ChatMessage] = event.data[-4:]
for msg in messages:
role = msg.role.value if msg.role else "unknown"
text = (msg.text or "")[:200]
print(f"[{role}]: {text}...")
workflow_complete = True
elif isinstance(event, WorkflowStatusEvent) and event.state == WorkflowRunState.IDLE:
workflow_complete = True
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,128 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Sample: Request Info with SequentialBuilder
This sample demonstrates using the `.with_request_info()` method to pause a
SequentialBuilder workflow BEFORE each agent runs, allowing external input
(e.g., human steering) before the agent responds.
Purpose:
Show how to use the request info API that pauses before every agent response,
using the standard request_info pattern for consistency.
Demonstrate:
- Configuring request info with `.with_request_info()`
- Handling RequestInfoEvent with AgentInputRequest data
- Injecting responses back into the workflow via send_responses_streaming
Prerequisites:
- Azure OpenAI configured for AzureOpenAIChatClient with required environment variables
- Authentication via azure-identity (run az login before executing)
"""
import asyncio
from agent_framework import (
AgentInputRequest,
ChatMessage,
RequestInfoEvent,
SequentialBuilder,
WorkflowOutputEvent,
WorkflowRunState,
WorkflowStatusEvent,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
async def main() -> None:
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Create agents for a sequential document review workflow
drafter = chat_client.create_agent(
name="drafter",
instructions=("You are a document drafter. When given a topic, create a brief draft (2-3 sentences)."),
)
editor = chat_client.create_agent(
name="editor",
instructions=(
"You are an editor. Review the draft and suggest improvements. "
"Incorporate any human feedback that was provided."
),
)
finalizer = chat_client.create_agent(
name="finalizer",
instructions=(
"You are a finalizer. Take the edited content and create a polished final version. "
"Incorporate any additional feedback provided."
),
)
# Build workflow with request info enabled (pauses before each agent)
workflow = SequentialBuilder().participants([drafter, editor, finalizer]).with_request_info().build()
# Run the workflow with request info handling
pending_responses: dict[str, str] | None = None
workflow_complete = False
print("Starting document review workflow...")
print("=" * 60)
while not workflow_complete:
# Run or continue the workflow
stream = (
workflow.send_responses_streaming(pending_responses)
if pending_responses
else workflow.run_stream("Write a brief introduction to artificial intelligence.")
)
pending_responses = None
# Process events
async for event in stream:
if isinstance(event, RequestInfoEvent):
if isinstance(event.data, AgentInputRequest):
# Display pre-agent context for steering
print("\n" + "-" * 40)
print("REQUEST INFO: INPUT REQUESTED")
print(f"About to call agent: {event.data.target_agent_id}")
print("-" * 40)
print("Conversation context:")
recent = (
event.data.conversation[-2:] if len(event.data.conversation) > 2 else event.data.conversation
)
for msg in recent:
role = msg.role.value if msg.role else "unknown"
text = (msg.text or "")[:150]
print(f" [{role}]: {text}...")
print("-" * 40)
# Get input to steer the agent
user_input = input("Your guidance (or 'skip' to continue): ") # noqa: ASYNC250
if user_input.lower() == "skip":
user_input = "Please continue naturally."
pending_responses = {event.request_id: user_input}
print("(Resuming workflow...)")
elif isinstance(event, WorkflowOutputEvent):
print("\n" + "=" * 60)
print("WORKFLOW COMPLETE")
print("=" * 60)
print("Final output:")
if event.data:
messages: list[ChatMessage] = event.data[-3:]
for msg in messages:
role = msg.role.value if msg.role else "unknown"
print(f"[{role}]: {msg.text}")
workflow_complete = True
elif isinstance(event, WorkflowStatusEvent) and event.state == WorkflowRunState.IDLE:
workflow_complete = True
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,183 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Annotated
from agent_framework import (
ChatMessage,
ConcurrentBuilder,
FunctionApprovalRequestContent,
FunctionApprovalResponseContent,
RequestInfoEvent,
WorkflowOutputEvent,
WorkflowRunState,
WorkflowStatusEvent,
ai_function,
)
from agent_framework.openai import OpenAIChatClient
"""
Sample: Concurrent Workflow with Tool Approval Requests
This sample demonstrates how to use ConcurrentBuilder with tools that require human
approval before execution. Multiple agents run in parallel, and any tool requiring
approval will pause the workflow until the human responds.
This sample works as follows:
1. A ConcurrentBuilder workflow is created with two agents running in parallel.
2. One agent has a tool requiring approval (financial transaction).
3. The other agent has only non-approval tools (market data lookup).
4. Both agents receive the same task and work concurrently.
5. When the financial agent tries to execute a trade, it triggers an approval request.
6. The sample simulates human approval and the workflow completes.
7. Results from both agents are aggregated and output.
Purpose:
Show how tool call approvals work in parallel execution scenarios where only some
agents have sensitive tools.
Demonstrate:
- Combining agents with and without approval-required tools in concurrent workflows.
- Handling RequestInfoEvent during concurrent agent execution.
- Understanding that approval pauses only the agent that triggered it, not all agents.
Prerequisites:
- OpenAI or Azure OpenAI configured with the required environment variables.
- Basic familiarity with ConcurrentBuilder and streaming workflow events.
"""
# 1. Define tools for the research agent (no approval required)
@ai_function
def get_stock_price(symbol: Annotated[str, "The stock ticker symbol"]) -> str:
"""Get the current stock price for a given symbol."""
# Mock data for demonstration
prices = {"AAPL": 175.50, "GOOGL": 140.25, "MSFT": 378.90, "AMZN": 178.75}
price = prices.get(symbol.upper(), 100.00)
return f"{symbol.upper()}: ${price:.2f}"
@ai_function
def get_market_sentiment(symbol: Annotated[str, "The stock ticker symbol"]) -> str:
"""Get market sentiment analysis for a stock."""
# Mock sentiment data
return f"Market sentiment for {symbol.upper()}: Bullish (72% positive mentions in last 24h)"
# 2. Define tools for the trading agent (approval required for trades)
@ai_function(approval_mode="always_require")
def execute_trade(
symbol: Annotated[str, "The stock ticker symbol"],
action: Annotated[str, "Either 'buy' or 'sell'"],
quantity: Annotated[int, "Number of shares to trade"],
) -> str:
"""Execute a stock trade. Requires human approval due to financial impact."""
return f"Trade executed: {action.upper()} {quantity} shares of {symbol.upper()}"
@ai_function
def get_portfolio_balance() -> str:
"""Get current portfolio balance and available funds."""
return "Portfolio: $50,000 invested, $10,000 cash available"
async def main() -> None:
# 3. Create two agents with different tool sets
chat_client = OpenAIChatClient()
research_agent = chat_client.create_agent(
name="ResearchAgent",
instructions=(
"You are a market research analyst. Analyze stock data and provide "
"recommendations based on price and sentiment. Do not execute trades."
),
tools=[get_stock_price, get_market_sentiment],
)
trading_agent = chat_client.create_agent(
name="TradingAgent",
instructions=(
"You are a trading assistant. When asked to buy or sell shares, you MUST "
"call the execute_trade function to complete the transaction. Check portfolio "
"balance first, then execute the requested trade."
),
tools=[get_portfolio_balance, execute_trade],
)
# 4. Build a concurrent workflow with both agents
# ConcurrentBuilder requires at least 2 participants for fan-out
workflow = ConcurrentBuilder().participants([research_agent, trading_agent]).build()
# 5. Start the workflow - both agents will process the same task in parallel
print("Starting concurrent workflow with tool approval...")
print("Two agents will analyze MSFT - one for research, one for trading.")
print("-" * 60)
# Phase 1: Run workflow and collect all events (stream ends at IDLE or IDLE_WITH_PENDING_REQUESTS)
request_info_events: list[RequestInfoEvent] = []
workflow_completed_without_approvals = False
async for event in workflow.run_stream("Analyze MSFT stock and if sentiment is positive, buy 10 shares."):
if isinstance(event, RequestInfoEvent):
request_info_events.append(event)
if isinstance(event.data, FunctionApprovalRequestContent):
print(f"\nApproval requested for tool: {event.data.function_call.name}")
print(f" Arguments: {event.data.function_call.arguments}")
elif isinstance(event, WorkflowStatusEvent) and event.state == WorkflowRunState.IDLE:
workflow_completed_without_approvals = True
# 6. Handle approval requests (if any)
if request_info_events:
responses: dict[str, FunctionApprovalResponseContent] = {}
for request_event in request_info_events:
if isinstance(request_event.data, FunctionApprovalRequestContent):
print(f"\nSimulating human approval for: {request_event.data.function_call.name}")
# Create approval response
responses[request_event.request_id] = request_event.data.create_response(approved=True)
if responses:
# Phase 2: Send all approvals and continue workflow
output: list[ChatMessage] | None = None
async for event in workflow.send_responses_streaming(responses):
if isinstance(event, WorkflowOutputEvent):
output = event.data
if output:
print("\n" + "-" * 60)
print("Workflow completed. Aggregated results from both agents:")
for msg in output:
if hasattr(msg, "author_name") and msg.author_name:
print(f"\n[{msg.author_name}]:")
text = msg.text[:300] + "..." if len(msg.text) > 300 else msg.text
if text:
print(f" {text}")
elif workflow_completed_without_approvals:
print("\nWorkflow completed without requiring approvals.")
print("(The trading agent may have only checked balance without executing a trade)")
"""
Sample Output:
Starting concurrent workflow with tool approval...
Two agents will analyze MSFT - one for research, one for trading.
------------------------------------------------------------
Approval requested for tool: execute_trade
Arguments: {"symbol": "MSFT", "action": "buy", "quantity": 10}
Simulating human approval for: execute_trade
------------------------------------------------------------
Workflow completed. Aggregated results from both agents:
[ResearchAgent]:
MSFT is currently trading at $175.50 with bullish market sentiment
(72% positive mentions). Based on the positive sentiment, this could
be a good opportunity to consider buying.
[TradingAgent]:
I've checked your portfolio balance ($10,000 cash available) and
executed the trade: BUY 10 shares of MSFT at approximately $175.50
per share, totaling ~$1,755.
"""
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,206 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Annotated
from agent_framework import (
FunctionApprovalRequestContent,
GroupChatBuilder,
GroupChatStateSnapshot,
RequestInfoEvent,
ai_function,
)
from agent_framework.openai import OpenAIChatClient
"""
Sample: Group Chat Workflow with Tool Approval Requests
This sample demonstrates how to use GroupChatBuilder with tools that require human
approval before execution. A group of specialized agents collaborate on a task, and
sensitive tool calls trigger human-in-the-loop approval.
This sample works as follows:
1. A GroupChatBuilder workflow is created with multiple specialized agents.
2. A selector function determines which agent speaks next based on conversation state.
3. Agents collaborate on a software deployment task.
4. When the deployment agent tries to deploy to production, it triggers an approval request.
5. The sample simulates human approval and the workflow completes.
Purpose:
Show how tool call approvals integrate with multi-agent group chat workflows where
different agents have different levels of tool access.
Demonstrate:
- Using set_select_speakers_func with agents that have approval-required tools.
- Handling RequestInfoEvent in group chat scenarios.
- Multi-round group chat with tool approval interruption and resumption.
Prerequisites:
- OpenAI or Azure OpenAI configured with the required environment variables.
- Basic familiarity with GroupChatBuilder and streaming workflow events.
"""
# 1. Define tools for different agents
@ai_function
def run_tests(test_suite: Annotated[str, "Name of the test suite to run"]) -> str:
"""Run automated tests for the application."""
return f"Test suite '{test_suite}' completed: 47 passed, 0 failed, 0 skipped"
@ai_function
def check_staging_status() -> str:
"""Check the current status of the staging environment."""
return "Staging environment: Healthy, Version 2.3.0 deployed, All services running"
@ai_function(approval_mode="always_require")
def deploy_to_production(
version: Annotated[str, "The version to deploy"],
components: Annotated[str, "Comma-separated list of components to deploy"],
) -> str:
"""Deploy specified components to production. Requires human approval."""
return f"Production deployment complete: Version {version}, Components: {components}"
@ai_function
def create_rollback_plan(version: Annotated[str, "The version being deployed"]) -> str:
"""Create a rollback plan for the deployment."""
return (
f"Rollback plan created for version {version}: "
"Automated rollback to v2.2.0 if health checks fail within 5 minutes"
)
# 2. Define the speaker selector function
def select_next_speaker(state: GroupChatStateSnapshot) -> str | None:
"""Select the next speaker based on the conversation flow.
This simple selector follows a predefined flow:
1. QA Engineer runs tests
2. DevOps Engineer checks staging and creates rollback plan
3. DevOps Engineer deploys to production (triggers approval)
"""
round_index: int = state["round_index"]
# Define the conversation flow
speaker_order: list[str] = [
"QAEngineer", # Round 0: Run tests
"DevOpsEngineer", # Round 1: Check staging, create rollback
"DevOpsEngineer", # Round 2: Deploy to production (approval required)
]
if round_index >= len(speaker_order):
return None # End the conversation
return speaker_order[round_index]
async def main() -> None:
# 3. Create specialized agents
chat_client = OpenAIChatClient()
qa_engineer = chat_client.create_agent(
name="QAEngineer",
instructions=(
"You are a QA engineer responsible for running tests before deployment. "
"Run the appropriate test suites and report results clearly."
),
tools=[run_tests],
)
devops_engineer = chat_client.create_agent(
name="DevOpsEngineer",
instructions=(
"You are a DevOps engineer responsible for deployments. First check staging "
"status and create a rollback plan, then proceed with production deployment. "
"Always ensure safety measures are in place before deploying."
),
tools=[check_staging_status, create_rollback_plan, deploy_to_production],
)
# 4. Build a group chat workflow with the selector function
workflow = (
GroupChatBuilder()
# Optionally, use `.set_manager(...)` to customize the group chat manager
.set_select_speakers_func(select_next_speaker)
.participants([qa_engineer, devops_engineer])
.with_max_rounds(5)
.build()
)
# 5. Start the workflow
print("Starting group chat workflow for software deployment...")
print("Agents: QA Engineer, DevOps Engineer")
print("-" * 60)
# Phase 1: Run workflow and collect all events (stream ends at IDLE or IDLE_WITH_PENDING_REQUESTS)
request_info_events: list[RequestInfoEvent] = []
async for event in workflow.run_stream(
"We need to deploy version 2.4.0 to production. Please coordinate the deployment."
):
if isinstance(event, RequestInfoEvent):
request_info_events.append(event)
if isinstance(event.data, FunctionApprovalRequestContent):
print("\n[APPROVAL REQUIRED]")
print(f" Tool: {event.data.function_call.name}")
print(f" Arguments: {event.data.function_call.arguments}")
# 6. Handle approval requests
if request_info_events:
for request_event in request_info_events:
if isinstance(request_event.data, FunctionApprovalRequestContent):
print("\n" + "=" * 60)
print("Human review required for production deployment!")
print("In a real scenario, you would review the deployment details here.")
print("Simulating approval for demo purposes...")
print("=" * 60)
# Create approval response
approval_response = request_event.data.create_response(approved=True)
# Phase 2: Send approval and continue workflow
async for _ in workflow.send_responses_streaming({request_event.request_id: approval_response}):
pass # Consume all events
print("\n" + "-" * 60)
print("Deployment workflow completed successfully!")
print("All agents have finished their tasks.")
else:
print("\nWorkflow completed without requiring production deployment approval.")
"""
Sample Output:
Starting group chat workflow for software deployment...
Agents: QA Engineer, DevOps Engineer
------------------------------------------------------------
[QAEngineer]: Running the integration test suite to verify the application
before deployment... Test suite 'integration' completed: 47 passed, 0 failed.
All tests passing - ready for deployment.
[DevOpsEngineer]: Checking staging environment status... Staging is healthy
with version 2.3.0. Creating rollback plan for version 2.4.0... Rollback plan
created with automated rollback to v2.2.0 if health checks fail.
[APPROVAL REQUIRED]
Tool: deploy_to_production
Arguments: {"version": "2.4.0", "components": "api,web,worker"}
============================================================
Human review required for production deployment!
In a real scenario, you would review the deployment details here.
Simulating approval for demo purposes...
============================================================
[DevOpsEngineer]: Production deployment complete! Version 2.4.0 has been
successfully deployed with components: api, web, worker.
------------------------------------------------------------
Deployment workflow completed successfully!
All agents have finished their tasks.
"""
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,144 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Annotated
from agent_framework import (
ChatMessage,
FunctionApprovalRequestContent,
RequestInfoEvent,
SequentialBuilder,
WorkflowOutputEvent,
ai_function,
)
from agent_framework.openai import OpenAIChatClient
"""
Sample: Sequential Workflow with Tool Approval Requests
This sample demonstrates how to use SequentialBuilder with tools that require human
approval before execution. The approval flow uses the existing @ai_function decorator
with approval_mode="always_require" to trigger human-in-the-loop interactions.
This sample works as follows:
1. A SequentialBuilder workflow is created with a single agent that has tools requiring approval.
2. The agent receives a user task and determines it needs to call a sensitive tool.
3. The tool call triggers a FunctionApprovalRequestContent, pausing the workflow.
4. The sample simulates human approval by responding to the RequestInfoEvent.
5. Once approved, the tool executes and the agent completes its response.
6. The workflow outputs the final conversation with all messages.
Purpose:
Show how tool call approvals integrate seamlessly with SequentialBuilder without
requiring any additional builder configuration.
Demonstrate:
- Using @ai_function(approval_mode="always_require") for sensitive operations.
- Handling RequestInfoEvent with FunctionApprovalRequestContent in sequential workflows.
- Resuming workflow execution after approval via send_responses_streaming.
Prerequisites:
- OpenAI or Azure OpenAI configured with the required environment variables.
- Basic familiarity with SequentialBuilder and streaming workflow events.
"""
# 1. Define tools - one requiring approval, one that doesn't
@ai_function(approval_mode="always_require")
def execute_database_query(
query: Annotated[str, "The SQL query to execute against the production database"],
) -> str:
"""Execute a SQL query against the production database. Requires human approval."""
# In a real implementation, this would execute the query
return f"Query executed successfully. Results: 3 rows affected by '{query}'"
@ai_function
def get_database_schema() -> str:
"""Get the current database schema. Does not require approval."""
return """
Tables:
- users (id, name, email, created_at)
- orders (id, user_id, total, status, created_at)
- products (id, name, price, stock)
"""
async def main() -> None:
# 2. Create the agent with tools (approval mode is set per-tool via decorator)
chat_client = OpenAIChatClient()
database_agent = chat_client.create_agent(
name="DatabaseAgent",
instructions=(
"You are a database assistant. You can view the database schema and execute "
"queries. Always check the schema before running queries. Be careful with "
"queries that modify data."
),
tools=[get_database_schema, execute_database_query],
)
# 3. Build a sequential workflow with the agent
workflow = SequentialBuilder().participants([database_agent]).build()
# 4. Start the workflow with a user task
print("Starting sequential workflow with tool approval...")
print("-" * 60)
# Phase 1: Run workflow and collect all events (stream ends at IDLE or IDLE_WITH_PENDING_REQUESTS)
request_info_events: list[RequestInfoEvent] = []
async for event in workflow.run_stream(
"Check the schema and then update all orders with status 'pending' to 'processing'"
):
if isinstance(event, RequestInfoEvent):
request_info_events.append(event)
if isinstance(event.data, FunctionApprovalRequestContent):
print(f"\nApproval requested for tool: {event.data.function_call.name}")
print(f" Arguments: {event.data.function_call.arguments}")
# 5. Handle approval requests
if request_info_events:
for request_event in request_info_events:
if isinstance(request_event.data, FunctionApprovalRequestContent):
# In a real application, you would prompt the user here
print("\nSimulating human approval (auto-approving for demo)...")
# Create approval response
approval_response = request_event.data.create_response(approved=True)
# Phase 2: Send approval and continue workflow
output: list[ChatMessage] | None = None
async for event in workflow.send_responses_streaming({request_event.request_id: approval_response}):
if isinstance(event, WorkflowOutputEvent):
output = event.data
if output:
print("\n" + "-" * 60)
print("Workflow completed. Final conversation:")
for msg in output:
role = msg.role.value if hasattr(msg.role, "value") else msg.role
text = msg.text[:200] + "..." if len(msg.text) > 200 else msg.text
print(f" [{role}]: {text}")
else:
print("No approval requests were generated (schema check may have been sufficient).")
"""
Sample Output:
Starting sequential workflow with tool approval...
------------------------------------------------------------
Approval requested for tool: execute_database_query
Arguments: {"query": "UPDATE orders SET status = 'processing' WHERE status = 'pending'"}
Simulating human approval (auto-approving for demo)...
------------------------------------------------------------
Workflow completed. Final conversation:
[user]: Check the schema and then update all orders with status 'pending' to 'processing'
[assistant]: I've checked the schema and executed the update query. The query
"UPDATE orders SET status = 'processing' WHERE status = 'pending'"
was executed successfully, affecting 3 rows.
"""
if __name__ == "__main__":
asyncio.run(main())