Files
agent-framework/python/samples/03-workflows/human-in-the-loop/agents_with_HITL.py
Eduard van Valkenburg 67ce1baecf Python: fix reasoning model workflow handoff and history serialization (#4083)
* fix: strip function_call and text_reasoning from cross-agent workflow handoff

When a reasoning model (e.g. gpt-5-mini) runs as Agent 1 in a workflow, its
response includes text_reasoning items (with server-scoped IDs like rs_XXXX)
and function_call items. Forwarding these to Agent 2 in a fresh conversation
caused API errors because the reasoning/call IDs are scoped to the original
stored response context.

Changes:
- Strip 'function_call', 'text_reasoning', 'function_approval_request', and
  'function_approval_response' from handoff messages in _agent_executor.py
- Keep 'function_result' so the actual tool output content is preserved for
  the next agent's context
- Update unit tests to reflect that function_result messages survive handoff
  (messages grow from 2→3: user, tool(result), assistant(summary))
- Fix incorrect test assertions in test_function_invocation_stop_clears_*
  that assumed the client layer updates session.service_session_id
- Also fixed _extract_function_calls to search all messages with call_id
  deduplication, and the error-limit stop path to submit function_call_output
  items before halting (via tool_choice=none cleanup call)

Relates to: https://github.com/microsoft/agent-framework/issues/4047

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix: reasoning model workflow handoff and history serialization

Fixes multiple related issues when using reasoning models (gpt-5-mini,
gpt-5.2) in multi-agent workflows that chain agents via from_response
or replay full conversation history via AgentExecutorRequest.

## Reasoning items always emitted on output_item.added

When a reasoning model produces encrypted or hidden reasoning (no
visible text), the Responses API still fires a reasoning output item
without any reasoning_text.delta events. Previously no text_reasoning
Content was emitted in that case, making it invisible to downstream
logic. Both the non-streaming (_parse_response_from_openai) and
streaming (output_item.added) paths now always emit at least one
text_reasoning Content — with empty text if no content is available —
so co-occurrence detection and serialization guards work reliably.

## Reasoning items only serialized when paired with a function_call

The Responses API only accepts reasoning items in input when they
directly preceded a function_call in the original response. Sending a
reasoning item that preceded a text response (no tool call) causes:
  "reasoning was provided without its required following item"
_prepare_message_for_openai now checks has_function_call per message
and skips text_reasoning serialization when there is no accompanying
function_call.

## summary field is an array, not an object

The reasoning item summary field sent to the Responses API must be an
array of objects ([{"type": "summary_text", "text": ...}]), not a
single object. Fixed _prepare_content_for_openai accordingly.

## service_session_id cleared when explicit history is provided

When a workflow coordinator replays a full conversation (including
function calls from a previous agent run) back to an executor via
AgentExecutorRequest or from_response, the executor's session still
held a service_session_id (previous_response_id) from the prior run.
The API then received the same function-call items twice — once from
previous_response_id (server-stored) and once from the explicit input —
causing: "Duplicate item found with id fc_...".

AgentExecutor.run (when should_respond=True) and from_response now
reset self._session.service_session_id = None before running so that
explicit input is the sole source of conversation context.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* small improvements in text reasoning

* refactor: add reset_service_session to AgentExecutorRequest for explicit history replay

Replace the implicit 'always clear service_session_id when should_respond=True'
with an explicit opt-in field on AgentExecutorRequest.

The old approach used should_respond=True as a proxy for 'full history replay',
but that conflates two distinct intents:
- Orchestrations group chat sends should_respond=True with an empty/single-message
  list (not a full replay) — unnecessarily clearing service_session_id.
- HITL / feedback coordinators send the full prior conversation and truly need
  a fresh service session ID to avoid duplicate-item API errors.

Changes:
- Add AgentExecutorRequest.reset_service_session: bool = False
- AgentExecutor.run only clears service_session_id when this flag is True
- AgentExecutor.from_response unchanged (always clears; always full conversation)
- Set reset_service_session=True in all full-history-replay call sites:
  agents_with_HITL.py, azure_chat_agents_tool_calls_with_feedback.py,
  autogen-migration round-robin coordinator, tau2 runner
- Update _FullHistoryReplayCoordinator test helper to pass the flag

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* comment update

* fixes from feedback

* fix test

* reverted changes to agent executor

* fix: remove reset_service_session from tau2 runner

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* two other reverts

* fix sample

---------

Co-authored-by: Giles Odigwe <79032838+giles17@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-02-19 21:02:20 +00:00

234 lines
8.6 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
from collections.abc import AsyncIterable
from dataclasses import dataclass, field
from agent_framework import (
AgentExecutorRequest,
AgentExecutorResponse,
AgentResponse,
AgentResponseUpdate,
Executor,
Message,
WorkflowBuilder,
WorkflowContext,
WorkflowEvent,
handler,
response_handler,
)
from agent_framework.azure import AzureOpenAIResponsesClient
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
from typing_extensions import Never
# Load environment variables from .env file
load_dotenv()
"""
Sample: Azure AI Agents in workflow with human feedback
Pipeline layout:
writer_agent -> Coordinator -> writer_agent -> Coordinator -> final_editor_agent -> Coordinator -> output
The writer agent drafts marketing copy. A custom executor emits a request_info event (type='request_info') so a
human can comment, then relays the human guidance back into the conversation before the final editor agent
produces the polished output.
Demonstrates:
- Capturing agent responses in a custom executor.
- Emitting request_info events (type='request_info') to request human input.
- Handling human feedback and routing it to the appropriate agents.
Prerequisites:
- AZURE_AI_PROJECT_ENDPOINT must be your Azure AI Foundry Agent Service (V2) project endpoint.
- Azure OpenAI configured for AzureOpenAIResponsesClient with required environment variables.
- Authentication via azure-identity. Run `az login` before executing.
"""
@dataclass
class DraftFeedbackRequest:
"""Payload sent for human review."""
prompt: str = ""
conversation: list[Message] = field(default_factory=lambda: [])
class Coordinator(Executor):
"""Bridge between the writer agent, human feedback, and final editor."""
def __init__(self, id: str, writer_name: str, final_editor_name: str) -> None:
super().__init__(id)
self.writer_name = writer_name
self.final_editor_name = final_editor_name
@handler
async def on_writer_response(
self,
draft: AgentExecutorResponse,
ctx: WorkflowContext[Never, AgentResponse],
) -> None:
"""Handle responses from the writer and final editor agents."""
if draft.executor_id == self.final_editor_name:
# No further processing is needed when the final editor has responded.
return
# Writer agent response; request human feedback.
# Preserve the full conversation so that the final editor has context.
conversation: list[Message]
if draft.full_conversation is not None:
conversation = list(draft.full_conversation)
else:
conversation = list(draft.agent_response.messages)
prompt = (
"Review the draft from the writer and provide a short directional note "
"(tone tweaks, must-have detail, target audience, etc.). "
"Keep it under 30 words."
)
await ctx.request_info(
request_data=DraftFeedbackRequest(prompt=prompt, conversation=conversation),
response_type=str,
)
@response_handler
async def on_human_feedback(
self,
original_request: DraftFeedbackRequest,
feedback: str,
ctx: WorkflowContext[AgentExecutorRequest],
) -> None:
"""Process human feedback and forward to the appropriate agent."""
note = feedback.strip()
if note.lower() == "approve":
# Human approved the draft as-is; forward it unchanged.
await ctx.send_message(
AgentExecutorRequest(
messages=original_request.conversation + [Message("user", text="The draft is approved as-is.")],
should_respond=True,
),
target_id=self.final_editor_name,
)
return
# Human provided feedback; prompt the writer to revise.
conversation: list[Message] = list(original_request.conversation)
instruction = (
"A human reviewer shared the following guidance:\n"
f"{note or 'No specific guidance provided.'}\n\n"
"Rewrite the draft from the previous assistant message into a polished final version. "
"Keep the response under 120 words and reflect any requested tone adjustments."
)
conversation.append(Message("user", text=instruction))
await ctx.send_message(
AgentExecutorRequest(messages=conversation, should_respond=True),
target_id=self.writer_name,
)
async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, str] | None:
"""Process events from the workflow stream to capture human feedback requests."""
# Track the last author to format streaming output.
last_author: str | None = None
requests: list[tuple[str, DraftFeedbackRequest]] = []
async for event in stream:
if event.type == "request_info" and isinstance(event.data, DraftFeedbackRequest):
requests.append((event.request_id, event.data))
elif event.type == "output" and isinstance(event.data, AgentResponseUpdate):
# This workflow should only produce AgentResponseUpdate as outputs.
# Streaming updates from an agent will be consecutive, because no two agents run simultaneously
# in this workflow. So we can use last_author to format output nicely.
update = event.data
author = update.author_name
if author != last_author:
if last_author is not None:
print() # Newline between different authors
print(f"{author}: {update.text}", end="", flush=True)
last_author = author
else:
print(update.text, end="", flush=True)
# Handle any pending human feedback requests.
if requests:
responses: dict[str, str] = {}
for request_id, _ in requests:
print("\nProvide guidance for the editor (or 'approve' to accept the draft).")
answer = input("Human feedback: ").strip() # noqa: ASYNC250
if answer.lower() == "exit":
print("Exiting...")
return None
responses[request_id] = answer
return responses
return None
async def main() -> None:
"""Run the workflow and bridge human feedback between two agents."""
# Create the agents
writer_agent = AzureOpenAIResponsesClient(
project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
).as_agent(
name="writer_agent",
instructions=("You are a marketing writer."),
tool_choice="required",
)
final_editor_agent = AzureOpenAIResponsesClient(
project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
).as_agent(
name="final_editor_agent",
instructions=(
"You are an editor who polishes marketing copy after human approval. "
"Correct any legal or factual issues. Return the final version even if no changes are made. "
),
)
# Create the executor
coordinator = Coordinator(
id="coordinator",
writer_name=writer_agent.name, # type: ignore
final_editor_name=final_editor_agent.name, # type: ignore
)
# Build the workflow.
workflow = (
WorkflowBuilder(start_executor=writer_agent)
.add_edge(writer_agent, coordinator)
.add_edge(coordinator, writer_agent)
.add_edge(final_editor_agent, coordinator)
.add_edge(coordinator, final_editor_agent)
.build()
)
print(
"Interactive mode. When prompted, provide a short feedback note for the editor.",
flush=True,
)
# Initiate the first run of the workflow.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(
"Create a short launch blurb for the LumenX desk lamp. Emphasize adjustability and warm lighting.",
stream=True,
)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
print("\nWorkflow complete.")
if __name__ == "__main__":
asyncio.run(main())