Python: fix reasoning model workflow handoff and history serialization (#4083)

* fix: strip function_call and text_reasoning from cross-agent workflow handoff

When a reasoning model (e.g. gpt-5-mini) runs as Agent 1 in a workflow, its
response includes text_reasoning items (with server-scoped IDs like rs_XXXX)
and function_call items. Forwarding these to Agent 2 in a fresh conversation
caused API errors because the reasoning/call IDs are scoped to the original
stored response context.

Changes:
- Strip 'function_call', 'text_reasoning', 'function_approval_request', and
  'function_approval_response' from handoff messages in _agent_executor.py
- Keep 'function_result' so the actual tool output content is preserved for
  the next agent's context
- Update unit tests to reflect that function_result messages survive handoff
  (messages grow from 2→3: user, tool(result), assistant(summary))
- Fix incorrect test assertions in test_function_invocation_stop_clears_*
  that assumed the client layer updates session.service_session_id
- Also fixed _extract_function_calls to search all messages with call_id
  deduplication, and the error-limit stop path to submit function_call_output
  items before halting (via tool_choice=none cleanup call)

Relates to: https://github.com/microsoft/agent-framework/issues/4047

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix: reasoning model workflow handoff and history serialization

Fixes multiple related issues when using reasoning models (gpt-5-mini,
gpt-5.2) in multi-agent workflows that chain agents via from_response
or replay full conversation history via AgentExecutorRequest.

## Reasoning items always emitted on output_item.added

When a reasoning model produces encrypted or hidden reasoning (no
visible text), the Responses API still fires a reasoning output item
without any reasoning_text.delta events. Previously no text_reasoning
Content was emitted in that case, making it invisible to downstream
logic. Both the non-streaming (_parse_response_from_openai) and
streaming (output_item.added) paths now always emit at least one
text_reasoning Content — with empty text if no content is available —
so co-occurrence detection and serialization guards work reliably.

## Reasoning items only serialized when paired with a function_call

The Responses API only accepts reasoning items in input when they
directly preceded a function_call in the original response. Sending a
reasoning item that preceded a text response (no tool call) causes:
  "reasoning was provided without its required following item"
_prepare_message_for_openai now checks has_function_call per message
and skips text_reasoning serialization when there is no accompanying
function_call.

## summary field is an array, not an object

The reasoning item summary field sent to the Responses API must be an
array of objects ([{"type": "summary_text", "text": ...}]), not a
single object. Fixed _prepare_content_for_openai accordingly.

## service_session_id cleared when explicit history is provided

When a workflow coordinator replays a full conversation (including
function calls from a previous agent run) back to an executor via
AgentExecutorRequest or from_response, the executor's session still
held a service_session_id (previous_response_id) from the prior run.
The API then received the same function-call items twice — once from
previous_response_id (server-stored) and once from the explicit input —
causing: "Duplicate item found with id fc_...".

AgentExecutor.run (when should_respond=True) and from_response now
reset self._session.service_session_id = None before running so that
explicit input is the sole source of conversation context.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* small improvements in text reasoning

* refactor: add reset_service_session to AgentExecutorRequest for explicit history replay

Replace the implicit 'always clear service_session_id when should_respond=True'
with an explicit opt-in field on AgentExecutorRequest.

The old approach used should_respond=True as a proxy for 'full history replay',
but that conflates two distinct intents:
- Orchestrations group chat sends should_respond=True with an empty/single-message
  list (not a full replay) — unnecessarily clearing service_session_id.
- HITL / feedback coordinators send the full prior conversation and truly need
  a fresh service session ID to avoid duplicate-item API errors.

Changes:
- Add AgentExecutorRequest.reset_service_session: bool = False
- AgentExecutor.run only clears service_session_id when this flag is True
- AgentExecutor.from_response unchanged (always clears; always full conversation)
- Set reset_service_session=True in all full-history-replay call sites:
  agents_with_HITL.py, azure_chat_agents_tool_calls_with_feedback.py,
  autogen-migration round-robin coordinator, tau2 runner
- Update _FullHistoryReplayCoordinator test helper to pass the flag

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* comment update

* fixes from feedback

* fix test

* reverted changes to agent executor

* fix: remove reset_service_session from tau2 runner

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* two other reverts

* fix sample

---------

Co-authored-by: Giles Odigwe <79032838+giles17@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Eduard van Valkenburg
2026-02-19 22:02:20 +01:00
committed by GitHub
Unverified
parent 2cb4137501
commit 67ce1baecf
11 changed files with 445 additions and 66 deletions
+34 -15
View File
@@ -1761,10 +1761,26 @@ def _get_result_hooks_from_stream(stream: Any) -> list[Callable[[Any], Any]]:
def _extract_function_calls(response: ChatResponse) -> list[Content]:
function_results = {it.call_id for it in response.messages[0].contents if it.type == "function_result"}
return [
it for it in response.messages[0].contents if it.type == "function_call" and it.call_id not in function_results
]
function_results = {
item.call_id
for message in response.messages
for item in message.contents
if item.type == "function_result" and item.call_id
}
seen_call_ids: set[str] = set()
function_calls: list[Content] = []
for message in response.messages:
for item in message.contents:
if item.type != "function_call":
continue
if item.call_id and item.call_id in function_results:
continue
if item.call_id and item.call_id in seen_call_ids:
continue
if item.call_id:
seen_call_ids.add(item.call_id)
function_calls.append(item)
return function_calls
def _prepend_fcc_messages(response: ChatResponse, fcc_messages: list[Message]) -> None:
@@ -1822,27 +1838,22 @@ def _handle_function_call_results(
if had_errors:
errors_in_a_row += 1
if errors_in_a_row >= max_errors:
reached_error_limit = errors_in_a_row >= max_errors
if reached_error_limit:
logger.warning(
"Maximum consecutive function call errors reached (%d). "
"Stopping further function calls for this request.",
max_errors,
)
return {
"action": "stop",
"errors_in_a_row": errors_in_a_row,
"result_message": None,
"update_role": None,
"function_call_results": None,
}
else:
errors_in_a_row = 0
reached_error_limit = False
result_message = Message(role="tool", contents=function_call_results)
response.messages.append(result_message)
fcc_messages.extend(response.messages)
return {
"action": "continue",
"action": "stop" if reached_error_limit else "continue",
"errors_in_a_row": errors_in_a_row,
"result_message": result_message,
"update_role": "tool",
@@ -2025,6 +2036,7 @@ class FunctionInvocationLayer(Generic[OptionsCoT]):
middleware_pipeline=function_middleware_pipeline,
)
filtered_kwargs = {k: v for k, v in kwargs.items() if k != "session"}
# Make options mutable so we can update conversation_id during function invocation loop
mutable_options: dict[str, Any] = dict(options) if options else {}
# Remove additional_function_arguments from options passed to underlying chat client
@@ -2090,7 +2102,9 @@ class FunctionInvocationLayer(Generic[OptionsCoT]):
if result["action"] == "return":
return response
if result["action"] == "stop":
break
# Error threshold reached: force a final non-tool turn so
# function_call_output items are submitted before exit.
mutable_options["tool_choice"] = "none"
errors_in_a_row = result["errors_in_a_row"]
# When tool_choice is 'required', reset tool_choice after one iteration to avoid infinite loops
@@ -2157,6 +2171,7 @@ class FunctionInvocationLayer(Generic[OptionsCoT]):
)
errors_in_a_row = approval_result["errors_in_a_row"]
if approval_result["action"] == "stop":
mutable_options["tool_choice"] = "none"
return
inner_stream = await _ensure_response_stream(
@@ -2205,7 +2220,11 @@ class FunctionInvocationLayer(Generic[OptionsCoT]):
contents=result["function_call_results"] or [],
role=role,
)
if result["action"] != "continue":
if result["action"] == "stop":
# Error threshold reached: submit collected function_call_output
# items once more with tools disabled.
mutable_options["tool_choice"] = "none"
elif result["action"] != "continue":
return
# When tool_choice is 'required', reset the tool_choice after one iteration to avoid infinite loops
@@ -531,6 +531,7 @@ class Content:
def from_text_reasoning(
cls: type[ContentT],
*,
id: str | None = None,
text: str | None = None,
protected_data: str | None = None,
annotations: Sequence[Annotation] | None = None,
@@ -540,6 +541,7 @@ class Content:
"""Create text reasoning content."""
return cls(
"text_reasoning",
id=id,
text=text,
protected_data=protected_data,
annotations=annotations,
@@ -144,10 +144,10 @@ class AgentExecutor(Executor):
immediately run the agent to produce a new response.
"""
# Replace cache with full conversation if available, else fall back to agent_response messages.
if prior.full_conversation is not None:
self._cache = list(prior.full_conversation)
else:
self._cache = list(prior.agent_response.messages)
source_messages = (
prior.full_conversation if prior.full_conversation is not None else prior.agent_response.messages
)
self._cache = list(source_messages)
await self._run_agent_and_emit(ctx)
@handler
@@ -311,7 +311,7 @@ class AgentExecutor(Executor):
# Snapshot current conversation as cache + latest agent outputs.
# Do not append to prior snapshots: callers may provide full-history messages
# in request.messages, and extending would duplicate prior turns.
self._full_conversation = list(self._cache) + (list(response.messages) if response else [])
self._full_conversation = [*self._cache, *(list(response.messages) if response else [])]
if response is None:
# Agent did not complete (e.g., waiting for user input); do not emit response
@@ -908,11 +908,16 @@ class RawOpenAIResponsesClient( # type: ignore[misc]
"type": "message",
"role": message.role,
}
# Reasoning items are only valid in input when they directly preceded a function_call
# in the same response. Including a reasoning item that preceded a text response
# (i.e. no function_call in the same message) causes an API error:
# "reasoning was provided without its required following item."
has_function_call = any(c.type == "function_call" for c in message.contents)
for content in message.contents:
match content.type:
case "text_reasoning":
# Reasoning items must be sent back as top-level input items
# for reasoning models that require them alongside function_calls
if not has_function_call:
continue # reasoning not followed by a function_call is invalid in input
reasoning = self._prepare_content_for_openai(message.role, content, call_id_to_id) # type: ignore[arg-type]
if reasoning:
all_messages.append(reasoning)
@@ -961,26 +966,19 @@ class RawOpenAIResponsesClient( # type: ignore[misc]
"text": content.text,
}
case "text_reasoning":
ret: dict[str, Any] = {
"type": "reasoning",
"summary": {
"type": "summary_text",
"text": content.text,
},
}
ret: dict[str, Any] = {"type": "reasoning", "summary": []}
if content.id:
ret["id"] = content.id
props: dict[str, Any] | None = getattr(content, "additional_properties", None)
if props:
if reasoning_id := props.get("reasoning_id"):
ret["id"] = reasoning_id
if status := props.get("status"):
ret["status"] = status
if reasoning_text := props.get("reasoning_text"):
ret["content"] = {
"type": "reasoning_text",
"text": reasoning_text,
}
ret["content"] = [{"type": "reasoning_text", "text": reasoning_text}]
if encrypted_content := props.get("encrypted_content"):
ret["encrypted_content"] = encrypted_content
if content.text:
ret["summary"].append({"type": "summary_text", "text": content.text})
return ret
case "data" | "uri":
if content.has_top_level_media_type("image"):
@@ -1189,30 +1187,45 @@ class RawOpenAIResponsesClient( # type: ignore[misc]
)
)
case "reasoning": # ResponseOutputReasoning
reasoning_id = getattr(item, "id", None)
if hasattr(item, "content") and item.content:
for index, reasoning_content in enumerate(item.content):
added_reasoning = False
if item_content := getattr(item, "content", None):
for index, reasoning_content in enumerate(item_content):
additional_properties: dict[str, Any] = {}
if reasoning_id:
additional_properties["reasoning_id"] = reasoning_id
if hasattr(item, "summary") and item.summary and index < len(item.summary):
additional_properties["summary"] = item.summary[index]
contents.append(
Content.from_text_reasoning(
id=item.id,
text=reasoning_content.text,
raw_representation=reasoning_content,
additional_properties=additional_properties or None,
)
)
if hasattr(item, "summary") and item.summary:
for summary in item.summary:
added_reasoning = True
if item_summary := getattr(item, "summary", None):
for summary in item_summary:
contents.append(
Content.from_text_reasoning(
id=item.id,
text=summary.text,
raw_representation=summary, # type: ignore[arg-type]
additional_properties={"reasoning_id": reasoning_id} if reasoning_id else None,
)
)
added_reasoning = True
if not added_reasoning:
# Reasoning item with no visible text (e.g. encrypted reasoning).
# Always emit an empty marker so co-occurrence detection can be done
additional_properties_empty: dict[str, Any] = {}
if encrypted := getattr(item, "encrypted_content", None):
additional_properties_empty["encrypted_content"] = encrypted
contents.append(
Content.from_text_reasoning(
id=item.id,
text="",
raw_representation=item,
additional_properties=additional_properties_empty or None,
)
)
case "code_interpreter_call": # ResponseOutputCodeInterpreterCall
call_id = getattr(item, "call_id", None) or getattr(item, "id", None)
outputs: list[Content] = []
@@ -1427,36 +1440,36 @@ class RawOpenAIResponsesClient( # type: ignore[misc]
case "response.reasoning_text.delta":
contents.append(
Content.from_text_reasoning(
id=event.item_id,
text=event.delta,
raw_representation=event,
additional_properties={"reasoning_id": event.item_id},
)
)
metadata.update(self._get_metadata_from_response(event))
case "response.reasoning_text.done":
contents.append(
Content.from_text_reasoning(
id=event.item_id,
text=event.text,
raw_representation=event,
additional_properties={"reasoning_id": event.item_id},
)
)
metadata.update(self._get_metadata_from_response(event))
case "response.reasoning_summary_text.delta":
contents.append(
Content.from_text_reasoning(
id=event.item_id,
text=event.delta,
raw_representation=event,
additional_properties={"reasoning_id": event.item_id},
)
)
metadata.update(self._get_metadata_from_response(event))
case "response.reasoning_summary_text.done":
contents.append(
Content.from_text_reasoning(
id=event.item_id,
text=event.text,
raw_representation=event,
additional_properties={"reasoning_id": event.item_id},
)
)
metadata.update(self._get_metadata_from_response(event))
@@ -1630,11 +1643,10 @@ class RawOpenAIResponsesClient( # type: ignore[misc]
)
case "reasoning": # ResponseOutputReasoning
reasoning_id = getattr(event_item, "id", None)
added_reasoning = False
if hasattr(event_item, "content") and event_item.content:
for index, reasoning_content in enumerate(event_item.content):
additional_properties: dict[str, Any] = {}
if reasoning_id:
additional_properties["reasoning_id"] = reasoning_id
if (
hasattr(event_item, "summary")
and event_item.summary
@@ -1643,11 +1655,27 @@ class RawOpenAIResponsesClient( # type: ignore[misc]
additional_properties["summary"] = event_item.summary[index]
contents.append(
Content.from_text_reasoning(
id=reasoning_id or None,
text=reasoning_content.text,
raw_representation=reasoning_content,
additional_properties=additional_properties or None,
)
)
added_reasoning = True
if not added_reasoning:
# Reasoning item with no visible text (e.g. encrypted reasoning).
# Always emit an empty marker so co-occurrence detection can occur.
additional_properties_empty: dict[str, Any] = {}
if encrypted := getattr(event_item, "encrypted_content", None):
additional_properties_empty["encrypted_content"] = encrypted
contents.append(
Content.from_text_reasoning(
id=reasoning_id or None,
text="",
raw_representation=event_item,
additional_properties=additional_properties_empty or None,
)
)
case _:
logger.debug("Unparsed event of type: %s: %s", event.type, event)
case "response.function_call_arguments.delta":
@@ -171,6 +171,62 @@ async def test_base_client_with_streaming_function_calling(chat_client_base: Sup
assert exec_counter == 1
async def test_base_client_executes_function_calls_across_multiple_response_messages(
chat_client_base: SupportsChatGetResponse,
):
exec_counter = 0
@tool(name="test_function", approval_mode="never_require")
def ai_func(arg1: str) -> str:
nonlocal exec_counter
exec_counter += 1
return f"Processed {arg1}"
chat_client_base.run_responses = [
ChatResponse(
messages=[
Message(
role="assistant",
contents=[
Content.from_function_call(
call_id="1",
name="test_function",
arguments='{"arg1": "v1"}',
)
],
),
Message(
role="assistant",
contents=[
Content.from_function_call(
call_id="2",
name="test_function",
arguments='{"arg1": "v2"}',
)
],
),
],
conversation_id="conv_after_first_call",
),
ChatResponse(
messages=Message(role="assistant", text="done"),
conversation_id="conv_after_second_call",
),
]
response = await chat_client_base.get_response(
[Message(role="user", text="hello")],
options={"tool_choice": "auto", "tools": [ai_func], "conversation_id": "conv_initial"},
)
assert exec_counter == 2
function_results = [
content for msg in response.messages for content in msg.contents if content.type == "function_result"
]
assert len(function_results) == 2
assert {result.call_id for result in function_results} == {"1", "2"}
async def test_function_invocation_inside_aiohttp_server(chat_client_base: SupportsChatGetResponse):
import aiohttp
from aiohttp import web
@@ -921,6 +977,36 @@ async def test_function_invocation_config_max_consecutive_errors(chat_client_bas
assert len(function_calls) <= 2
async def test_function_invocation_stop_clears_conversation_id_non_stream(chat_client_base: SupportsChatGetResponse):
"""Stop-path responses should not carry a continuation conversation_id."""
@tool(name="error_function", approval_mode="never_require")
def error_func(arg1: str) -> str:
raise ValueError("Function error")
chat_client_base.run_responses = [
ChatResponse(
messages=Message(
role="assistant",
contents=[
Content.from_function_call(call_id="1", name="error_function", arguments='{"arg1": "value1"}')
],
),
conversation_id="resp_1",
)
]
chat_client_base.function_invocation_configuration["max_consecutive_errors_per_request"] = 1
session_stub = type("SessionStub", (), {"service_session_id": "resp_seed"})()
response = await chat_client_base.get_response(
[Message(role="user", text="hello")],
options={"tool_choice": "auto", "tools": [error_func]},
session=session_stub,
)
assert response.conversation_id is None
async def test_function_invocation_config_terminate_on_unknown_calls_false(chat_client_base: SupportsChatGetResponse):
"""Test that terminate_on_unknown_calls=False returns error message for unknown functions."""
exec_counter = 0
@@ -2140,6 +2226,43 @@ async def test_streaming_function_invocation_config_max_consecutive_errors(chat_
assert len(function_calls) <= 2
async def test_streaming_function_invocation_stop_clears_conversation_id(chat_client_base: SupportsChatGetResponse):
"""Streaming stop-path responses should not carry a continuation conversation_id."""
@tool(name="error_function", approval_mode="never_require")
def error_func(arg1: str) -> str:
raise ValueError("Function error")
chat_client_base.streaming_responses = [
[
ChatResponseUpdate(
contents=[
Content.from_function_call(call_id="1", name="error_function", arguments='{"arg1": "value1"}')
],
role="assistant",
conversation_id="resp_1",
)
]
]
chat_client_base.function_invocation_configuration["max_consecutive_errors_per_request"] = 1
session_stub = type("SessionStub", (), {"service_session_id": "resp_seed"})()
stream = chat_client_base.get_response(
"hello",
options={"tool_choice": "auto", "tools": [error_func]},
stream=True,
session=session_stub,
)
async for _ in stream:
pass
response = await stream.get_final_response()
# After the stop-path cleanup call, the accumulated stream response keeps the
# conversation_id from the first inner call; the cleanup call's own response id
# is what matters for server-side resolution but is not reflected in the mock here.
assert response is not None
async def test_streaming_function_invocation_config_terminate_on_unknown_calls_false(
chat_client_base: SupportsChatGetResponse,
):
@@ -2869,8 +2992,9 @@ async def test_streaming_function_calling_response_includes_reasoning_and_tool_r
ChatResponseUpdate(
contents=[
Content.from_text_reasoning(
id="rs_test123",
text="Let me search for that",
additional_properties={"reasoning_id": "rs_test123", "status": "completed"},
additional_properties={"status": "completed"},
)
],
role="assistant",
@@ -2912,8 +3036,7 @@ async def test_streaming_function_calling_response_includes_reasoning_and_tool_r
assert "function_result" in all_content_types, "Function result must be in response messages for chaining"
assert "text" in all_content_types, "Final text must be in response messages"
# Verify reasoning has the reasoning_id preserved
# Verify reasoning has the id preserved
reasoning_contents = [c for msg in response.messages for c in msg.contents if c.type == "text_reasoning"]
assert len(reasoning_contents) >= 1
assert reasoning_contents[0].additional_properties is not None
assert reasoning_contents[0].additional_properties.get("reasoning_id") == "rs_test123"
assert reasoning_contents[0].id == "rs_test123"
@@ -821,8 +821,9 @@ def test_prepare_message_for_openai_includes_reasoning_with_function_call() -> N
client = OpenAIResponsesClient(model_id="test-model", api_key="test-key")
reasoning = Content.from_text_reasoning(
id="rs_abc123",
text="Let me analyze the request",
additional_properties={"status": "completed", "reasoning_id": "rs_abc123"},
additional_properties={"status": "completed"},
)
function_call = Content.from_function_call(
call_id="call_123",
@@ -841,7 +842,7 @@ def test_prepare_message_for_openai_includes_reasoning_with_function_call() -> N
assert "function_call" in types
reasoning_item = next(item for item in result if item["type"] == "reasoning")
assert reasoning_item["summary"]["text"] == "Let me analyze the request"
assert reasoning_item["summary"][0]["text"] == "Let me analyze the request"
assert reasoning_item["id"] == "rs_abc123", "Reasoning id must be preserved for the API"
@@ -860,8 +861,9 @@ def test_prepare_messages_for_openai_full_conversation_with_reasoning() -> None:
role="assistant",
contents=[
Content.from_text_reasoning(
id="rs_test123",
text="I need to search for hotels",
additional_properties={"reasoning_id": "rs_test123", "status": "completed"},
additional_properties={"status": "completed"},
),
Content.from_function_call(
call_id="call_1",
@@ -1895,6 +1897,7 @@ def test_prepare_content_for_openai_text_reasoning_comprehensive() -> None:
# Test TextReasoningContent with all additional properties
comprehensive_reasoning = Content.from_text_reasoning(
id="rs_comprehensive",
text="Comprehensive reasoning summary",
additional_properties={
"status": "in_progress",
@@ -1904,10 +1907,11 @@ def test_prepare_content_for_openai_text_reasoning_comprehensive() -> None:
)
result = client._prepare_content_for_openai("assistant", comprehensive_reasoning, {}) # type: ignore
assert result["type"] == "reasoning"
assert result["summary"]["text"] == "Comprehensive reasoning summary"
assert result["id"] == "rs_comprehensive"
assert result["summary"][0]["text"] == "Comprehensive reasoning summary"
assert result["status"] == "in_progress"
assert result["content"]["type"] == "reasoning_text"
assert result["content"]["text"] == "Step-by-step analysis"
assert result["content"][0]["type"] == "reasoning_text"
assert result["content"][0]["text"] == "Step-by-step analysis"
assert result["encrypted_content"] == "secure_data_456"
@@ -1931,6 +1935,7 @@ def test_streaming_reasoning_text_delta_event() -> None:
assert len(response.contents) == 1
assert response.contents[0].type == "text_reasoning"
assert response.contents[0].id == "reasoning_123"
assert response.contents[0].text == "reasoning delta"
assert response.contents[0].raw_representation == event
mock_metadata.assert_called_once_with(event)
@@ -3,6 +3,7 @@
from collections.abc import AsyncIterable, Awaitable, Sequence
from typing import Any
import pytest
from pydantic import PrivateAttr
from typing_extensions import Never
@@ -54,6 +55,67 @@ class _SimpleAgent(BaseAgent):
return _run()
class _ToolHistoryAgent(BaseAgent):
"""Agent that emits tool-call internals plus a final assistant summary."""
def __init__(self, *, summary_text: str, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._summary_text = summary_text
def _messages(self) -> list[Message]:
return [
Message(
role="assistant",
contents=[
Content.from_function_call(
call_id="call_weather_1",
name="get_weather",
arguments='{"location":"Seattle"}',
)
],
),
Message(
role="tool",
contents=[Content.from_function_result(call_id="call_weather_1", result="Sunny, 72F")],
),
Message(role="assistant", contents=[Content.from_text(text=self._summary_text)]),
]
def run(
self,
messages: str | Content | Message | Sequence[str | Content | Message] | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
if stream:
async def _stream() -> AsyncIterable[AgentResponseUpdate]:
yield AgentResponseUpdate(
contents=[
Content.from_function_call(
call_id="call_weather_1",
name="get_weather",
arguments='{"location":"Seattle"}',
)
],
role="assistant",
)
yield AgentResponseUpdate(
contents=[Content.from_function_result(call_id="call_weather_1", result="Sunny, 72F")],
role="tool",
)
yield AgentResponseUpdate(contents=[Content.from_text(text=self._summary_text)], role="assistant")
return ResponseStream(_stream(), finalizer=AgentResponse.from_updates)
async def _run() -> AgentResponse:
return AgentResponse(messages=self._messages())
return _run()
class _CaptureFullConversation(Executor):
"""Captures AgentExecutorResponse.full_conversation and completes the workflow."""
@@ -153,6 +215,39 @@ async def test_sequential_adapter_uses_full_conversation() -> None:
assert seen[1].role == "assistant" and "A1 reply" in (seen[1].text or "")
async def test_sequential_handoff_preserves_function_call_for_non_reasoning_model() -> None:
# Arrange: non-reasoning agent emits function_call + function_result + summary
first = _ToolHistoryAgent(
id="tool_history_agent",
name="ToolHistory",
summary_text="The weather in Seattle is sunny and 72F.",
)
second = _CaptureAgent(id="capture_agent", name="Capture", reply_text="Captured")
wf = SequentialBuilder(participants=[first, second]).build()
# Act
result = await wf.run("Check weather and continue")
# Assert workflow completed
outputs = result.get_outputs()
assert outputs
# For non-reasoning models (no text_reasoning), function_call and function_result are
# both kept so the receiving agent has the full call/result pair as context.
seen = second._last_messages # pyright: ignore[reportPrivateUsage]
assert len(seen) == 4 # user, assistant(function_call), tool(function_result), assistant(summary)
assert seen[0].role == "user"
assert "Check weather and continue" in (seen[0].text or "")
assert seen[1].role == "assistant"
assert any(content.type == "function_call" for content in seen[1].contents)
assert seen[2].role == "tool"
assert any(content.type == "function_result" for content in seen[2].contents)
assert seen[3].role == "assistant"
assert "Seattle is sunny" in (seen[3].text or "")
# No text_reasoning should appear (non-reasoning model)
assert all(content.type != "text_reasoning" for msg in seen for content in msg.contents)
class _RoundTripCoordinator(Executor):
"""Loops once back to the same agent with full conversation + feedback."""
@@ -212,3 +307,109 @@ async def test_agent_executor_full_conversation_round_trip_does_not_duplicate_hi
assert payload["texts"][1] == "draft reply"
assert payload["texts"][2] == "apply feedback"
assert payload["texts"][3] == "draft reply"
class _SessionIdCapturingAgent(BaseAgent):
"""Records service_session_id of the session at run() time."""
_captured_service_session_id: str | None = PrivateAttr(default="NOT_CAPTURED")
def run(
self,
messages: str | Content | Message | Sequence[str | Content | Message] | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
self._captured_service_session_id = session.service_session_id if session else None
async def _run() -> AgentResponse:
return AgentResponse(messages=[Message("assistant", ["done"])])
return _run()
class _FullHistoryReplayCoordinator(Executor):
"""Coordinator that pre-sets service_session_id on a target executor then replays the full
conversation (including function calls) back to it via AgentExecutorRequest."""
def __init__(self, *, target_exec: AgentExecutor, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._target_exec = target_exec
@handler
async def handle(
self,
response: AgentExecutorResponse,
ctx: WorkflowContext[Never, Any],
) -> None:
full_conv = list(response.full_conversation or response.agent_response.messages)
full_conv.append(Message(role="user", text="follow-up"))
# Simulate a prior run: the target executor has a stored previous_response_id.
self._target_exec._session.service_session_id = "resp_PREVIOUS_RUN" # pyright: ignore[reportPrivateUsage]
await ctx.send_message(
AgentExecutorRequest(messages=full_conv, should_respond=True),
target_id=self._target_exec.id,
)
@pytest.mark.xfail(
reason="reset_service_session support not yet implemented — see #4047",
strict=True,
)
async def test_run_request_with_full_history_clears_service_session_id() -> None:
"""Replaying a full conversation (including function calls) via AgentExecutorRequest must
clear service_session_id so the API does not receive both previous_response_id and the
same function-call items in input which would cause a 'Duplicate item' API error."""
tool_agent = _ToolHistoryAgent(
id="tool_agent", name="ToolAgent", summary_text="Done."
)
tool_exec = AgentExecutor(tool_agent, id="tool_agent")
spy_agent = _SessionIdCapturingAgent(id="spy_agent", name="SpyAgent")
spy_exec = AgentExecutor(spy_agent, id="spy_agent")
coordinator = _FullHistoryReplayCoordinator(id="coord", target_exec=spy_exec)
wf = (
WorkflowBuilder(start_executor=tool_exec, output_executors=[coordinator])
.add_edge(tool_exec, coordinator)
.add_edge(coordinator, spy_exec)
.build()
)
result = await wf.run("initial prompt")
assert result.get_outputs() is not None
# The spy agent must have seen service_session_id=None (cleared before run).
# Without the fix, it would see "resp_PREVIOUS_RUN" and the API would raise
# "Duplicate item found" because the same function-call IDs appear in both
# previous_response_id (server-stored) and the explicit input messages.
assert spy_agent._captured_service_session_id is None # pyright: ignore[reportPrivateUsage]
async def test_from_response_preserves_service_session_id() -> None:
"""from_response hands off a prior agent's full conversation to the next executor.
The receiving executor's service_session_id is preserved so the API can continue
the conversation using previous_response_id."""
tool_agent = _ToolHistoryAgent(
id="tool_agent2", name="ToolAgent", summary_text="Done."
)
tool_exec = AgentExecutor(tool_agent, id="tool_agent2")
spy_agent = _SessionIdCapturingAgent(id="spy_agent2", name="SpyAgent")
spy_exec = AgentExecutor(spy_agent, id="spy_agent2")
# Simulate a prior run on the spy executor.
spy_exec._session.service_session_id = "resp_PREVIOUS_RUN" # pyright: ignore[reportPrivateUsage]
wf = (
WorkflowBuilder(start_executor=tool_exec, output_executors=[spy_exec])
.add_edge(tool_exec, spy_exec)
.build()
)
result = await wf.run("start")
assert result.get_outputs() is not None
assert spy_agent._captured_service_session_id == "resp_PREVIOUS_RUN" # pyright: ignore[reportPrivateUsage]
@@ -20,7 +20,6 @@ management, enabling persistent conversation history storage across sessions
with Redis as the backend data store.
"""
# Default Redis URL for local Redis Stack.
# Override via the REDIS_URL environment variable for remote or authenticated instances.
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")
@@ -153,7 +153,7 @@ class Coordinator(Executor):
# Human approved the draft as-is; forward it unchanged.
await ctx.send_message(
AgentExecutorRequest(
messages=original_request.conversation + [Message("user", text="The draft is approved as-is.")],
messages=[*original_request.conversation, *[Message("user", text="The draft is approved as-is.")]],
should_respond=True,
),
target_id=self.final_editor_id,
@@ -161,16 +161,15 @@ class Coordinator(Executor):
return
# Human provided feedback; prompt the writer to revise.
conversation: list[Message] = list(original_request.conversation)
instruction = (
"A human reviewer shared the following guidance:\n"
f"{note or 'No specific guidance provided.'}\n\n"
"Rewrite the draft from the previous assistant message into a polished final version. "
"Keep the response under 120 words and reflect any requested tone adjustments."
)
conversation.append(Message("user", text=instruction))
await ctx.send_message(
AgentExecutorRequest(messages=conversation, should_respond=True), target_id=self.writer_id
AgentExecutorRequest(messages=[Message("user", text=instruction)], should_respond=True),
target_id=self.writer_id,
)
@@ -123,7 +123,8 @@ class Coordinator(Executor):
)
conversation.append(Message("user", text=instruction))
await ctx.send_message(
AgentExecutorRequest(messages=conversation, should_respond=True), target_id=self.writer_name
AgentExecutorRequest(messages=conversation, should_respond=True),
target_id=self.writer_name,
)
@@ -144,7 +144,9 @@ async def run_agent_framework_with_cycle() -> None:
if last_message and "APPROVED" in last_message.text:
await context.yield_output("Content approved.")
else:
await context.send_message(AgentExecutorRequest(messages=response.full_conversation, should_respond=True))
await context.send_message(
AgentExecutorRequest(messages=response.full_conversation, should_respond=True)
)
workflow = (
WorkflowBuilder(start_executor=researcher)