Files
agent-framework/python/packages/foundry_hosting/tests/test_responses.py
T
Tao Chen 9cafd7e58b Python: Refactor workflow as agent pending request handling (#6259)
* WIP: Refactor Workflow as agent pending request handling

* WIP: debugging empty message bug

* Working: Workflow as agent with function approval

* Address Copilot comments

* Fix mypy

* Address comments and fix pipeline

* Request info non function approval now becomes function call

* Revert uv.lock

* Fix mypy

* Bump min version of azure-ai-project

* Remove RequestInfoFunctionArgs

* fix tests

* Fix failing tests

* Fix sample
2026-06-05 17:23:19 +00:00

3976 lines
158 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""HTTP round-trip tests for ResponsesHostServer.
These tests exercise the full HTTP pipeline using httpx.AsyncClient with
ASGITransport — no real server process is started. Requests go through
the Starlette routing stack, the Responses API middleware, and arrive at
the registered _handle_create handler.
"""
from __future__ import annotations
import json
import uuid
from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
from dataclasses import dataclass
from typing import Literal, overload
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
from agent_framework import (
AgentExecutorRequest,
AgentResponse,
AgentResponseUpdate,
AgentSession,
Content,
FileCheckpointStorage,
HistoryProvider,
Message,
RawAgent,
ResponseStream,
SupportsAgentRun,
WorkflowAgent,
WorkflowBuilder,
WorkflowCheckpoint,
WorkflowCheckpointException,
WorkflowContext,
WorkflowMessage,
executor,
)
from azure.ai.agentserver.responses import InMemoryResponseProvider
from mcp import McpError
from mcp.types import ErrorData
from typing_extensions import Any
from agent_framework_foundry_hosting import ResponsesHostServer
from agent_framework_foundry_hosting._responses import (
_AZURE_RESPONSES_MESSAGE_ROLE_TYPE, # pyright: ignore[reportPrivateUsage]
CONSENT_ERROR_CODE,
ConsentError,
FileBasedFunctionApprovalStorage, # pyright: ignore[reportPrivateUsage]
InMemoryFunctionApprovalStorage, # pyright: ignore[reportPrivateUsage]
_item_to_message, # pyright: ignore[reportPrivateUsage]
_output_item_to_message, # pyright: ignore[reportPrivateUsage]
consent_url_from_error,
)
def _make_function_approval_request_content(
*,
request_id: str = "apr_test",
call_id: str = "call_1",
name: str = "delete_file",
arguments: str = '{"path": "/foo"}',
server_label: str = "my_server",
) -> Content:
"""Build a function_approval_request Content with an embedded function_call."""
function_call = Content.from_function_call(
call_id, name, arguments=arguments, additional_properties={"server_label": server_label}
)
return Content.from_function_approval_request(request_id, function_call)
# region Helpers
def _make_agent(
*,
response: AgentResponse | None = None,
stream_updates: list[AgentResponseUpdate] | None = None,
raw_agent: bool = True,
) -> MagicMock:
"""Create a mock agent implementing SupportsAgentRun."""
agent = MagicMock(spec=RawAgent) if raw_agent else MagicMock()
agent.id = "test-agent"
agent.name = "Test Agent"
agent.description = "A mock agent for testing"
agent.context_providers = []
if response is not None:
async def run_non_streaming(*args: Any, **kwargs: Any) -> AgentResponse:
return response
agent.run = AsyncMock(side_effect=run_non_streaming)
if stream_updates is not None:
async def _stream_gen() -> AsyncIterator[AgentResponseUpdate]:
for update in stream_updates:
yield update
def run_streaming(*args: Any, **kwargs: Any) -> Any:
if kwargs.get("stream"):
return ResponseStream(_stream_gen()) # type: ignore
raise NotImplementedError("Only streaming is configured on this mock")
agent.run = MagicMock(side_effect=run_streaming)
return agent
def _make_server(agent: Any, **kwargs: Any) -> ResponsesHostServer:
"""Create a ResponsesHostServer with an in-memory store."""
return ResponsesHostServer(agent, store=InMemoryResponseProvider(), **kwargs)
async def _post(
server: ResponsesHostServer,
*,
input_text: str = "Hello",
model: str = "test-model",
stream: bool = False,
temperature: float | None = None,
top_p: float | None = None,
max_output_tokens: int | None = None,
parallel_tool_calls: bool | None = None,
) -> httpx.Response:
"""Send a POST /responses request through the ASGI transport."""
payload: dict[str, Any] = {"model": model, "input": input_text, "stream": stream}
if temperature is not None:
payload["temperature"] = temperature
if top_p is not None:
payload["top_p"] = top_p
if max_output_tokens is not None:
payload["max_output_tokens"] = max_output_tokens
if parallel_tool_calls is not None:
payload["parallel_tool_calls"] = parallel_tool_calls
transport = httpx.ASGITransport(app=server)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
return await client.post("/responses", json=payload)
def _parse_sse_events(body: str) -> list[dict[str, Any]]:
"""Parse SSE text into a list of event dicts with 'event' and 'data' keys."""
events: list[dict[str, Any]] = []
current_event: str | None = None
current_data_lines: list[str] = []
for line in body.split("\n"):
if line.startswith("event: "):
current_event = line[len("event: ") :]
elif line.startswith("data: "):
current_data_lines.append(line[len("data: ") :])
elif line.strip() == "" and current_event is not None:
data_str = "\n".join(current_data_lines)
try:
data = json.loads(data_str)
except json.JSONDecodeError:
data = data_str
events.append({"event": current_event, "data": data})
current_event = None
current_data_lines = []
return events
def _sse_event_types(events: list[dict[str, Any]]) -> list[str]:
"""Extract event type strings from parsed SSE events."""
return [e["event"] for e in events]
# endregion
# region Initialization
class TestResponsesHostServerInit:
def test_init_basic(self) -> None:
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("hi")])])
)
server = _make_server(agent)
assert server is not None
def test_init_rejects_history_provider_with_load_messages(self) -> None:
hp = HistoryProvider(source_id="test", load_messages=True)
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("hi")])])
)
agent.context_providers = [hp]
with pytest.raises(RuntimeError, match="history provider"):
ResponsesHostServer(agent)
# endregion
# region Health Check
class TestHealthCheck:
async def test_readiness(self) -> None:
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("hi")])])
)
server = _make_server(agent)
transport = httpx.ASGITransport(app=server)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get("/readiness")
assert resp.status_code == 200
# endregion
# region Non-streaming
class TestNonStreaming:
async def test_basic_text_response(self) -> None:
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("Hello!")])])
)
server = _make_server(agent)
resp = await _post(server, input_text="Hi", stream=False)
assert resp.status_code == 200
assert "application/json" in resp.headers["content-type"]
body = resp.json()
assert body["object"] == "response"
assert body["status"] == "completed"
assert len(body["output"]) > 0
# Find the message output item with our text
text_found = False
for item in body["output"]:
assert item["type"] == "message"
for part in item.get("content", []):
if part.get("type") == "output_text" and part.get("text") == "Hello!":
text_found = True
assert text_found, f"Expected 'Hello!' in output, got: {body['output']}"
async def test_function_call_and_result(self) -> None:
agent = _make_agent(
response=AgentResponse(
messages=[
Message(
role="assistant",
contents=[Content.from_function_call("call_1", "get_weather", arguments='{"loc": "NYC"}')],
),
Message(role="tool", contents=[Content.from_function_result("call_1", result="sunny")]),
Message(role="assistant", contents=[Content.from_text("The weather is sunny!")]),
]
)
)
server = _make_server(agent)
resp = await _post(server, stream=False)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
types = [item["type"] for item in body["output"]]
assert "function_call" in types
assert "function_call_output" in types
assert "message" in types
async def test_hosted_mcp_call_and_result_persist_as_single_mcp_call(self) -> None:
agent = _make_agent(
response=AgentResponse(
messages=[
Message(
role="assistant",
contents=[
Content.from_mcp_server_tool_call(
call_id="mcp_abc123",
tool_name="search",
server_name="api_specs",
arguments='{"q": "cats"}',
)
],
),
Message(
role="tool",
contents=[
Content.from_mcp_server_tool_result(
call_id="mcp_abc123",
output=[Content.from_text(text="found 10 cats")],
)
],
),
Message(role="assistant", contents=[Content.from_text("I found 10 cats!")]),
]
)
)
server = _make_server(agent)
resp = await _post(server, stream=False)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
types = [item["type"] for item in body["output"]]
assert "mcp_call" in types
assert "custom_tool_call_output" not in types
mcp_items = [item for item in body["output"] if item["type"] == "mcp_call"]
assert len(mcp_items) == 1
assert mcp_items[0]["id"] == "mcp_abc123"
assert mcp_items[0]["output"] == "found 10 cats"
async def test_reasoning_content(self) -> None:
agent = _make_agent(
response=AgentResponse(
messages=[
Message(
role="assistant",
contents=[
Content.from_text_reasoning(text="Let me think..."),
Content.from_text("The answer is 42"),
],
),
]
)
)
server = _make_server(agent)
resp = await _post(server, stream=False)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
types = [item["type"] for item in body["output"]]
assert "reasoning" in types
assert "message" in types
async def test_empty_response(self) -> None:
agent = _make_agent(response=AgentResponse(messages=[]))
server = _make_server(agent)
resp = await _post(server, stream=False)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
async def test_chat_options_forwarded(self) -> None:
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("ok")])]),
raw_agent=True,
)
server = _make_server(agent)
resp = await _post(
server,
stream=False,
temperature=0.5,
top_p=0.9,
max_output_tokens=1024,
parallel_tool_calls=True,
)
assert resp.status_code == 200
agent.run.assert_awaited_once()
call_kwargs = agent.run.call_args.kwargs
assert call_kwargs["stream"] is False
options = call_kwargs["options"]
assert options["temperature"] == 0.5
assert options["top_p"] == 0.9
assert options["max_tokens"] == 1024
assert options["allow_multiple_tool_calls"] is True
# endregion
# region Streaming
class TestStreaming:
async def test_chat_options_forwarded(self) -> None:
agent = _make_agent(
stream_updates=[AgentResponseUpdate(contents=[Content.from_text("ok")], role="assistant")],
raw_agent=True,
)
server = _make_server(agent)
resp = await _post(
server,
stream=True,
temperature=0.5,
top_p=0.9,
max_output_tokens=1024,
parallel_tool_calls=True,
)
assert resp.status_code == 200
agent.run.assert_called_once()
call_kwargs = agent.run.call_args.kwargs
assert call_kwargs["stream"] is True
options = call_kwargs["options"]
assert options["temperature"] == 0.5
assert options["top_p"] == 0.9
assert options["max_tokens"] == 1024
assert options["allow_multiple_tool_calls"] is True
async def test_basic_text_streaming(self) -> None:
agent = _make_agent(
stream_updates=[
AgentResponseUpdate(contents=[Content.from_text("Hello ")], role="assistant"),
AgentResponseUpdate(contents=[Content.from_text("world!")], role="assistant"),
]
)
server = _make_server(agent)
resp = await _post(server, stream=True)
assert resp.status_code == 200
assert "text/event-stream" in resp.headers["content-type"]
events = _parse_sse_events(resp.text)
types = _sse_event_types(events)
assert types[0] == "response.created"
assert types[1] == "response.in_progress"
assert types[-1] == "response.completed"
assert "response.output_text.delta" in types
assert types.count("response.output_text.delta") == 2
assert "response.output_text.done" in types
# Verify the accumulated text in the done event
done_events = [e for e in events if e["event"] == "response.output_text.done"]
assert len(done_events) == 1
assert done_events[0]["data"]["text"] == "Hello world!"
async def test_function_call_streaming(self) -> None:
agent = _make_agent(
stream_updates=[
AgentResponseUpdate(
contents=[Content.from_function_call("call_1", "search", arguments='{"q":')],
role="assistant",
),
AgentResponseUpdate(
contents=[Content.from_function_call("call_1", "search", arguments=' "hello"}')],
role="assistant",
),
]
)
server = _make_server(agent)
resp = await _post(server, stream=True)
assert resp.status_code == 200
events = _parse_sse_events(resp.text)
types = _sse_event_types(events)
assert types[0] == "response.created"
assert types[-1] == "response.completed"
assert types.count("response.function_call_arguments.delta") == 2
assert "response.function_call_arguments.done" in types
# Verify accumulated arguments
args_done = [e for e in events if e["event"] == "response.function_call_arguments.done"]
assert len(args_done) == 1
assert args_done[0]["data"]["arguments"] == '{"q": "hello"}'
async def test_function_call_streaming_serializes_dataclass_arguments(self) -> None:
@dataclass
class HandoffLikeRequest:
agent_response: AgentResponse
request = HandoffLikeRequest(
agent_response=AgentResponse(
messages=[Message(role="assistant", contents=[Content.from_text("Need more details")])]
)
)
agent = _make_agent(
stream_updates=[
AgentResponseUpdate(
contents=[Content.from_function_call("call_1", "handoff_to_refund", arguments=request)],
role="assistant",
),
]
)
server = _make_server(agent)
resp = await _post(server, stream=True)
assert resp.status_code == 200
events = _parse_sse_events(resp.text)
args_done = [e for e in events if e["event"] == "response.function_call_arguments.done"]
assert len(args_done) == 1
payload = json.loads(args_done[0]["data"]["arguments"])
assert payload["agent_response"]["type"] == "agent_response"
assert payload["agent_response"]["messages"][0]["contents"][0]["text"] == "Need more details"
async def test_alternating_text_and_function_call(self) -> None:
agent = _make_agent(
stream_updates=[
# Text deltas
AgentResponseUpdate(contents=[Content.from_text("Let me ")], role="assistant"),
AgentResponseUpdate(contents=[Content.from_text("search...")], role="assistant"),
# Function call argument deltas
AgentResponseUpdate(
contents=[Content.from_function_call("call_1", "search", arguments='{"q":')],
role="assistant",
),
AgentResponseUpdate(
contents=[Content.from_function_call("call_1", "search", arguments=' "x"}')],
role="assistant",
),
# More text deltas
AgentResponseUpdate(contents=[Content.from_text("Found ")], role="assistant"),
AgentResponseUpdate(contents=[Content.from_text("it!")], role="assistant"),
]
)
server = _make_server(agent)
resp = await _post(server, stream=True)
assert resp.status_code == 200
events = _parse_sse_events(resp.text)
types = _sse_event_types(events)
assert types[0] == "response.created"
assert types[-1] == "response.completed"
# 4 text deltas + 2 function call argument deltas
assert types.count("response.output_text.delta") == 4
assert types.count("response.function_call_arguments.delta") == 2
# 3 distinct output items (text, fc, text)
assert types.count("response.output_item.added") == 3
assert types.count("response.output_item.done") == 3
# Verify accumulated content
text_done = [e for e in events if e["event"] == "response.output_text.done"]
assert len(text_done) == 2
assert text_done[0]["data"]["text"] == "Let me search..."
assert text_done[1]["data"]["text"] == "Found it!"
args_done = [e for e in events if e["event"] == "response.function_call_arguments.done"]
assert len(args_done) == 1
assert args_done[0]["data"]["arguments"] == '{"q": "x"}'
async def test_reasoning_then_text_streaming(self) -> None:
agent = _make_agent(
stream_updates=[
# Reasoning deltas
AgentResponseUpdate(contents=[Content.from_text_reasoning(text="Let me ")], role="assistant"),
AgentResponseUpdate(contents=[Content.from_text_reasoning(text="think...")], role="assistant"),
# Text deltas
AgentResponseUpdate(contents=[Content.from_text("The answer ")], role="assistant"),
AgentResponseUpdate(contents=[Content.from_text("is 42")], role="assistant"),
]
)
server = _make_server(agent)
resp = await _post(server, stream=True)
assert resp.status_code == 200
events = _parse_sse_events(resp.text)
types = _sse_event_types(events)
assert types[0] == "response.created"
assert types[-1] == "response.completed"
# Reasoning + text = 2 output items
assert types.count("response.output_item.added") == 2
assert types.count("response.output_item.done") == 2
assert types.count("response.output_text.delta") == 2
# Verify accumulated text
text_done = [e for e in events if e["event"] == "response.output_text.done"]
assert len(text_done) == 1
assert text_done[0]["data"]["text"] == "The answer is 42"
async def test_empty_streaming(self) -> None:
agent = _make_agent(stream_updates=[])
server = _make_server(agent)
resp = await _post(server, stream=True)
assert resp.status_code == 200
events = _parse_sse_events(resp.text)
types = _sse_event_types(events)
assert types == ["response.created", "response.in_progress", "response.completed"]
async def test_mixed_contents_in_single_update(self) -> None:
"""Text and function call in one update switches builder mid-update."""
agent = _make_agent(
stream_updates=[
AgentResponseUpdate(
contents=[
Content.from_text("Let me search"),
Content.from_function_call("call_1", "search", arguments='{"q": "test"}'),
],
role="assistant",
),
]
)
server = _make_server(agent)
resp = await _post(server, stream=True)
assert resp.status_code == 200
events = _parse_sse_events(resp.text)
types = _sse_event_types(events)
assert "response.output_text.delta" in types
assert "response.output_text.done" in types
assert "response.function_call_arguments.delta" in types
assert "response.function_call_arguments.done" in types
async def test_different_function_call_ids_produce_separate_items(self) -> None:
agent = _make_agent(
stream_updates=[
AgentResponseUpdate(
contents=[Content.from_function_call("call_1", "func_a", arguments='{"x":1}')],
role="assistant",
),
AgentResponseUpdate(
contents=[Content.from_function_call("call_2", "func_b", arguments='{"y":2}')],
role="assistant",
),
]
)
server = _make_server(agent)
resp = await _post(server, stream=True)
assert resp.status_code == 200
events = _parse_sse_events(resp.text)
types = _sse_event_types(events)
# Two separate function call items
assert types.count("response.output_item.added") == 2
assert types.count("response.function_call_arguments.done") == 2
async def test_mcp_tool_call_streaming(self) -> None:
agent = _make_agent(
stream_updates=[
AgentResponseUpdate(
contents=[
Content(
type="mcp_server_tool_call",
server_name="my_server",
tool_name="search",
arguments='{"query":',
)
],
role="assistant",
),
AgentResponseUpdate(
contents=[
Content(
type="mcp_server_tool_call",
server_name="my_server",
tool_name="search",
arguments=' "test"}',
)
],
role="assistant",
),
]
)
server = _make_server(agent)
resp = await _post(server, stream=True)
assert resp.status_code == 200
events = _parse_sse_events(resp.text)
types = _sse_event_types(events)
assert types[0] == "response.created"
assert types[-1] == "response.completed"
assert "response.output_item.added" in types
assert "response.output_item.done" in types
async def test_mcp_tool_call_and_result_streaming_emit_single_completed_mcp_call(self) -> None:
agent = _make_agent(
stream_updates=[
AgentResponseUpdate(
contents=[
Content.from_mcp_server_tool_call(
call_id="mcp_abc123",
tool_name="search",
server_name="api_specs",
arguments='{"q":',
)
],
role="assistant",
),
AgentResponseUpdate(
contents=[
Content.from_mcp_server_tool_call(
call_id="mcp_abc123",
tool_name="search",
server_name="api_specs",
arguments=' "cats"}',
)
],
role="assistant",
),
AgentResponseUpdate(
contents=[
Content.from_mcp_server_tool_result(
call_id="mcp_abc123",
output=[Content.from_text(text="found 10 cats")],
)
],
role="tool",
),
]
)
server = _make_server(agent)
resp = await _post(server, stream=True)
assert resp.status_code == 200
events = _parse_sse_events(resp.text)
done_events = [e for e in events if e["event"] == "response.output_item.done"]
assert len(done_events) == 1
assert done_events[0]["data"]["item"]["type"] == "mcp_call"
assert done_events[0]["data"]["item"]["id"] == "mcp_abc123"
assert done_events[0]["data"]["item"]["output"] == "found 10 cats"
# endregion
# region _output_item_to_message conversion
class TestOutputItemToMessage:
"""Tests for _output_item_to_message covering all supported OutputItem types."""
async def test_output_message(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemOutputMessage, OutputMessageContentOutputTextContent
item = OutputItemOutputMessage({
"type": "output_message",
"role": "assistant",
"content": [OutputMessageContentOutputTextContent({"type": "output_text", "text": "hello"})],
"status": "completed",
"id": "msg-1",
})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert len(msg.contents) == 1
assert msg.contents[0].type == "text"
assert msg.contents[0].text == "hello"
async def test_message(self) -> None:
from azure.ai.agentserver.responses.models import MessageContentInputTextContent, OutputItemMessage
item = OutputItemMessage({
"type": "message",
"role": "user",
"content": [MessageContentInputTextContent({"type": "input_text", "text": "hi"})],
})
msg = await _output_item_to_message(item)
assert msg.role == "user"
assert len(msg.contents) == 1
assert msg.contents[0].text == "hi"
async def test_function_call(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemFunctionToolCall
item = OutputItemFunctionToolCall({
"type": "function_call",
"call_id": "call_1",
"name": "get_weather",
"arguments": '{"city": "NYC"}',
"status": "completed",
"id": "fc-1",
})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert msg.contents[0].type == "function_call"
assert msg.contents[0].call_id == "call_1"
assert msg.contents[0].name == "get_weather"
async def test_function_call_output(self) -> None:
from azure.ai.agentserver.responses.models import FunctionCallOutputItemParam
item = FunctionCallOutputItemParam({"type": "function_call_output", "call_id": "call_1", "output": "sunny"})
msg = await _output_item_to_message(item) # type: ignore[arg-type]
assert msg.role == "tool"
assert msg.contents[0].type == "function_result"
assert msg.contents[0].call_id == "call_1"
assert msg.contents[0].result == "sunny"
async def test_reasoning(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemReasoningItem, SummaryTextContent
item = OutputItemReasoningItem({
"type": "reasoning",
"id": "r-1",
"summary": [SummaryTextContent({"type": "summary_text", "text": "thinking hard"})],
})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert len(msg.contents) == 1
assert msg.contents[0].text == "thinking hard"
async def test_reasoning_no_summary(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemReasoningItem
item = OutputItemReasoningItem({"type": "reasoning", "id": "r-2"})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert msg.contents == []
async def test_mcp_call(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemMcpToolCall
item = OutputItemMcpToolCall({
"type": "mcp_call",
"id": "mcp-1",
"server_label": "my_server",
"name": "search",
"arguments": '{"q": "test"}',
})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert msg.contents[0].type == "mcp_server_tool_call"
assert msg.contents[0].server_name == "my_server"
assert msg.contents[0].tool_name == "search"
async def test_mcp_call_with_output_reconstructs_mcp_result_content(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemMcpToolCall
item = OutputItemMcpToolCall({
"type": "mcp_call",
"id": "mcp-1",
"server_label": "my_server",
"name": "search",
"arguments": '{"q": "test"}',
"output": "found 10 cats",
})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert len(msg.contents) == 2
assert msg.contents[0].type == "mcp_server_tool_call"
assert msg.contents[1].type == "mcp_server_tool_result"
assert msg.contents[1].output == "found 10 cats"
async def test_mcp_approval_request(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemMcpApprovalRequest
storage = InMemoryFunctionApprovalStorage()
saved = _make_function_approval_request_content(request_id="apr-1")
await storage.save_approval_request("apr-1", saved)
item = OutputItemMcpApprovalRequest({
"type": "mcp_approval_request",
"id": "apr-1",
"server_label": "srv",
"name": "dangerous_tool",
"arguments": "{}",
})
msg = await _output_item_to_message(item, approval_storage=storage)
assert msg.role == "assistant"
assert msg.contents[0].type == "function_approval_request"
async def test_mcp_approval_response(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemMcpApprovalResponseResource
storage = InMemoryFunctionApprovalStorage()
saved = _make_function_approval_request_content(request_id="apr-1")
await storage.save_approval_request("apr-1", saved)
item = OutputItemMcpApprovalResponseResource({
"type": "mcp_approval_response",
"id": "resp-1",
"approval_request_id": "apr-1",
"approve": True,
})
msg = await _output_item_to_message(item, approval_storage=storage)
assert msg.role == "user"
assert msg.contents[0].type == "function_approval_response"
assert msg.contents[0].approved is True
async def test_code_interpreter_call(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemCodeInterpreterToolCall
item = OutputItemCodeInterpreterToolCall({
"type": "code_interpreter_call",
"id": "ci-1",
"status": "completed",
"container_id": "c-1",
"code": "print('hi')",
"outputs": [],
})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert msg.contents[0].type == "code_interpreter_tool_call"
async def test_image_generation_call(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemImageGenToolCall
item = OutputItemImageGenToolCall({"type": "image_generation_call", "id": "ig-1", "status": "completed"})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert msg.contents[0].type == "image_generation_tool_call"
async def test_shell_call(self) -> None:
from azure.ai.agentserver.responses.models import (
FunctionShellAction,
FunctionShellCallEnvironment,
OutputItemFunctionShellCall,
)
item = OutputItemFunctionShellCall({
"type": "shell_call",
"id": "sc-1",
"call_id": "call_sc",
"action": FunctionShellAction({"commands": ["ls", "-la"], "timeout_ms": 5000, "max_output_length": 1024}),
"status": "completed",
"environment": FunctionShellCallEnvironment({"type": "local"}),
})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert msg.contents[0].type == "shell_tool_call"
assert msg.contents[0].commands == ["ls", "-la"]
assert msg.contents[0].call_id == "call_sc"
async def test_shell_call_output(self) -> None:
from azure.ai.agentserver.responses.models import (
FunctionShellCallOutputContent,
FunctionShellCallOutputExitOutcome,
OutputItemFunctionShellCallOutput,
)
item = OutputItemFunctionShellCallOutput({
"type": "shell_call_output",
"id": "sco-1",
"call_id": "call_sc",
"status": "completed",
"output": [
FunctionShellCallOutputContent({
"stdout": "file.txt",
"stderr": "",
"outcome": FunctionShellCallOutputExitOutcome({"exit_code": 0}),
})
],
"max_output_length": 1024,
})
msg = await _output_item_to_message(item)
assert msg.role == "tool"
assert msg.contents[0].type == "shell_tool_result"
assert msg.contents[0].call_id == "call_sc"
async def test_local_shell_call(self) -> None:
from azure.ai.agentserver.responses.models import LocalShellExecAction, OutputItemLocalShellToolCall
item = OutputItemLocalShellToolCall({
"type": "local_shell_call",
"id": "lsc-1",
"call_id": "call_lsc",
"action": LocalShellExecAction({"type": "exec", "command": ["echo", "hello"], "env": {}}),
"status": "completed",
})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert msg.contents[0].type == "shell_tool_call"
assert msg.contents[0].commands == ["echo", "hello"]
async def test_local_shell_call_output(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemLocalShellToolCallOutput
item = OutputItemLocalShellToolCallOutput({
"type": "local_shell_call_output",
"id": "lsco-1",
"output": "hello\n",
})
msg = await _output_item_to_message(item)
assert msg.role == "tool"
assert msg.contents[0].type == "shell_tool_result"
async def test_file_search_call(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemFileSearchToolCall
item = OutputItemFileSearchToolCall({
"type": "file_search_call",
"id": "fs-1",
"status": "completed",
"queries": ["what is AI"],
})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert msg.contents[0].type == "function_call"
assert msg.contents[0].name == "file_search"
assert '"what is AI"' in (msg.contents[0].arguments or "")
async def test_web_search_call(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemWebSearchToolCall, WebSearchActionSearch
item = OutputItemWebSearchToolCall({
"type": "web_search_call",
"id": "ws-1",
"status": "completed",
"action": WebSearchActionSearch({"type": "search", "query": "test"}),
})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert msg.contents[0].type == "function_call"
assert msg.contents[0].name == "web_search"
async def test_computer_call(self) -> None:
from azure.ai.agentserver.responses.models import ComputerAction, OutputItemComputerToolCall
item = OutputItemComputerToolCall({
"type": "computer_call",
"id": "cc-1",
"call_id": "call_cc",
"action": ComputerAction({"type": "click"}),
"pending_safety_checks": [],
"status": "completed",
})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert msg.contents[0].type == "function_call"
assert msg.contents[0].name == "computer_use"
async def test_computer_call_output(self) -> None:
from azure.ai.agentserver.responses.models import (
ComputerScreenshotImage,
OutputItemComputerToolCallOutputResource,
)
item = OutputItemComputerToolCallOutputResource({
"type": "computer_call_output",
"call_id": "call_cc",
"output": ComputerScreenshotImage({
"type": "computer_screenshot",
"image_url": "data:image/png;base64,abc",
}),
})
msg = await _output_item_to_message(item)
assert msg.role == "tool"
assert msg.contents[0].type == "function_result"
assert msg.contents[0].call_id == "call_cc"
async def test_custom_tool_call(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemCustomToolCall
item = OutputItemCustomToolCall({
"type": "custom_tool_call",
"call_id": "call_ct",
"name": "my_tool",
"input": '{"key": "value"}',
})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert msg.contents[0].type == "function_call"
assert msg.contents[0].name == "my_tool"
assert msg.contents[0].arguments == '{"key": "value"}'
async def test_custom_tool_call_output(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemCustomToolCallOutput
item = OutputItemCustomToolCallOutput({
"type": "custom_tool_call_output",
"call_id": "call_ct",
"output": "result text",
})
msg = await _output_item_to_message(item)
assert msg.role == "tool"
assert msg.contents[0].type == "function_result"
assert msg.contents[0].result == "result text"
async def test_custom_tool_call_output_with_mcp_call_id_routes_to_mcp_server_tool_result(self) -> None:
"""When the host wrote a hosted-MCP result via
`aoutput_item_custom_tool_call_output`, the persisted call_id keeps
its `mcp_*` prefix. On read, that result must reconstruct as a
`mcp_server_tool_result` Content (not `function_result`), so the
chat-client serialize layer treats it as a hosted-MCP result and
does not produce an orphan `function_call_output`.
"""
from azure.ai.agentserver.responses.models import OutputItemCustomToolCallOutput
item = OutputItemCustomToolCallOutput({
"type": "custom_tool_call_output",
"call_id": "mcp_06b686e11f118cf40169f0e5badb3081979842929d5cf04920",
"output": "found 10 cats",
})
msg = await _output_item_to_message(item)
assert msg.role == "tool"
assert len(msg.contents) == 1
c = msg.contents[0]
assert c.type == "mcp_server_tool_result", (
f"expected mcp_server_tool_result for mcp_-prefixed call_id; got {c.type}"
)
assert c.call_id == "mcp_06b686e11f118cf40169f0e5badb3081979842929d5cf04920"
async def test_apply_patch_call(self) -> None:
from azure.ai.agentserver.responses.models import ApplyPatchUpdateFileOperation, OutputItemApplyPatchToolCall
item = OutputItemApplyPatchToolCall({
"type": "apply_patch_call",
"id": "ap-1",
"call_id": "call_ap",
"status": "completed",
"operation": ApplyPatchUpdateFileOperation({
"type": "update_file",
"path": "file.py",
"diff": "+ new line",
}),
})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert msg.contents[0].type == "function_call"
assert msg.contents[0].name == "apply_patch"
async def test_apply_patch_call_output(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemApplyPatchToolCallOutput
item = OutputItemApplyPatchToolCallOutput({
"type": "apply_patch_call_output",
"id": "apo-1",
"call_id": "call_ap",
"status": "completed",
"output": "patch applied",
})
msg = await _output_item_to_message(item)
assert msg.role == "tool"
assert msg.contents[0].type == "function_result"
assert msg.contents[0].result == "patch applied"
async def test_oauth_consent_request(self) -> None:
from azure.ai.agentserver.responses.models import OAuthConsentRequestOutputItem
item = OAuthConsentRequestOutputItem({
"type": "oauth_consent_request",
"id": "oauth-1",
"consent_link": "https://example.com/consent",
"server_label": "my_server",
})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert msg.contents[0].type == "oauth_consent_request"
assert msg.contents[0].consent_link == "https://example.com/consent"
async def test_structured_outputs_dict(self) -> None:
from azure.ai.agentserver.responses.models import StructuredOutputsOutputItem
item = StructuredOutputsOutputItem({"type": "structured_outputs", "id": "so-1", "output": {"answer": 42}})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert msg.contents[0].type == "text"
assert json.loads(msg.contents[0].text or "") == {"answer": 42}
async def test_structured_outputs_string(self) -> None:
from azure.ai.agentserver.responses.models import StructuredOutputsOutputItem
item = StructuredOutputsOutputItem({"type": "structured_outputs", "id": "so-2", "output": "plain text"})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert msg.contents[0].text == "plain text"
async def test_unsupported_type_raises(self) -> None:
from azure.ai.agentserver.responses.models import OutputItem
item = OutputItem({"type": "some_unknown_type"})
with pytest.raises(ValueError, match="Unsupported OutputItem type: some_unknown_type"):
await _output_item_to_message(item)
# endregion
# region _item_to_message conversion
class TestItemToMessage:
"""Tests for _item_to_message covering all supported Item types."""
async def test_message_with_string_content(self) -> None:
from azure.ai.agentserver.responses.models import ItemMessage
item = ItemMessage({"type": "message", "role": "user", "content": "hello"})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "user"
assert len(msg.contents) == 1
assert msg.contents[0].type == "text"
assert msg.contents[0].text == "hello"
async def test_message_with_input_text_content(self) -> None:
from azure.ai.agentserver.responses.models import ItemMessage, MessageContentInputTextContent
item = ItemMessage({
"type": "message",
"role": "user",
"content": [MessageContentInputTextContent({"type": "input_text", "text": "hi there"})],
})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "user"
assert len(msg.contents) == 1
assert msg.contents[0].text == "hi there"
async def test_message_with_multiple_contents(self) -> None:
from azure.ai.agentserver.responses.models import ItemMessage, MessageContentInputTextContent
item = ItemMessage({
"type": "message",
"role": "user",
"content": [
MessageContentInputTextContent({"type": "input_text", "text": "first"}),
MessageContentInputTextContent({"type": "input_text", "text": "second"}),
],
})
msg = await _item_to_message(item)
assert msg is not None
assert len(msg.contents) == 2
assert msg.contents[0].text == "first"
assert msg.contents[1].text == "second"
async def test_output_message(self) -> None:
from azure.ai.agentserver.responses.models import ItemOutputMessage, OutputMessageContentOutputTextContent
item = ItemOutputMessage({
"type": "output_message",
"role": "assistant",
"content": [OutputMessageContentOutputTextContent({"type": "output_text", "text": "response"})],
"status": "completed",
"id": "msg-1",
})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "assistant"
assert len(msg.contents) == 1
assert msg.contents[0].type == "text"
assert msg.contents[0].text == "response"
async def test_function_call(self) -> None:
from azure.ai.agentserver.responses.models import ItemFunctionToolCall
item = ItemFunctionToolCall({
"type": "function_call",
"call_id": "call_1",
"name": "get_weather",
"arguments": '{"city": "NYC"}',
"status": "completed",
"id": "fc-1",
})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "assistant"
assert msg.contents[0].type == "function_call"
assert msg.contents[0].call_id == "call_1"
assert msg.contents[0].name == "get_weather"
assert msg.contents[0].arguments == '{"city": "NYC"}'
async def test_function_call_output(self) -> None:
from azure.ai.agentserver.responses.models import FunctionCallOutputItemParam
item = FunctionCallOutputItemParam({"type": "function_call_output", "call_id": "call_1", "output": "sunny"})
msg = await _item_to_message(item) # type: ignore[arg-type]
assert msg is not None
assert msg.role == "tool"
assert msg.contents[0].type == "function_result"
assert msg.contents[0].call_id == "call_1"
assert msg.contents[0].result == "sunny"
async def test_function_call_output_non_string(self) -> None:
from azure.ai.agentserver.responses.models import FunctionCallOutputItemParam
item = FunctionCallOutputItemParam({"type": "function_call_output", "call_id": "call_2", "output": 42})
msg = await _item_to_message(item) # type: ignore[arg-type]
assert msg is not None
assert msg.role == "tool"
assert msg.contents[0].result == "42"
async def test_reasoning_with_summary(self) -> None:
from azure.ai.agentserver.responses.models import ItemReasoningItem, SummaryTextContent
item = ItemReasoningItem({
"type": "reasoning",
"id": "r-1",
"summary": [SummaryTextContent({"type": "summary_text", "text": "thinking hard"})],
})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "assistant"
assert len(msg.contents) == 1
assert msg.contents[0].text == "thinking hard"
async def test_reasoning_no_summary(self) -> None:
from azure.ai.agentserver.responses.models import ItemReasoningItem
item = ItemReasoningItem({"type": "reasoning", "id": "r-2"})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "assistant"
assert msg.contents == []
async def test_mcp_call(self) -> None:
from azure.ai.agentserver.responses.models import ItemMcpToolCall
item = ItemMcpToolCall({
"type": "mcp_call",
"id": "mcp-1",
"server_label": "my_server",
"name": "search",
"arguments": '{"q": "test"}',
})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "assistant"
assert msg.contents[0].type == "mcp_server_tool_call"
assert msg.contents[0].server_name == "my_server"
assert msg.contents[0].tool_name == "search"
async def test_mcp_call_with_output_reconstructs_mcp_result_content(self) -> None:
from azure.ai.agentserver.responses.models import ItemMcpToolCall
item = ItemMcpToolCall({
"type": "mcp_call",
"id": "mcp-1",
"server_label": "my_server",
"name": "search",
"arguments": '{"q": "test"}',
"output": "found 10 cats",
})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "assistant"
assert len(msg.contents) == 2
assert msg.contents[0].type == "mcp_server_tool_call"
assert msg.contents[1].type == "mcp_server_tool_result"
assert msg.contents[1].output == "found 10 cats"
async def test_mcp_approval_request(self) -> None:
from azure.ai.agentserver.responses.models import ItemMcpApprovalRequest
storage = InMemoryFunctionApprovalStorage()
saved = _make_function_approval_request_content(request_id="apr-1")
await storage.save_approval_request("apr-1", saved)
item = ItemMcpApprovalRequest({
"type": "mcp_approval_request",
"id": "apr-1",
"server_label": "srv",
"name": "dangerous_tool",
"arguments": "{}",
})
msg = await _item_to_message(item, approval_storage=storage)
assert msg is not None
assert msg.role == "assistant"
assert msg.contents[0].type == "function_approval_request"
async def test_mcp_approval_response(self) -> None:
from azure.ai.agentserver.responses.models import MCPApprovalResponse
storage = InMemoryFunctionApprovalStorage()
saved = _make_function_approval_request_content(request_id="apr-1")
await storage.save_approval_request("apr-1", saved)
item = MCPApprovalResponse({
"type": "mcp_approval_response",
"approval_request_id": "apr-1",
"approve": True,
})
msg = await _item_to_message(item, approval_storage=storage) # type: ignore[arg-type]
assert msg is not None
assert msg.role == "user"
assert msg.contents[0].type == "function_approval_response"
assert msg.contents[0].approved is True
async def test_code_interpreter_call(self) -> None:
from azure.ai.agentserver.responses.models import ItemCodeInterpreterToolCall
item = ItemCodeInterpreterToolCall({
"type": "code_interpreter_call",
"id": "ci-1",
"status": "completed",
"container_id": "c-1",
"code": "print('hi')",
"outputs": [],
})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "assistant"
assert msg.contents[0].type == "code_interpreter_tool_call"
async def test_image_generation_call(self) -> None:
from azure.ai.agentserver.responses.models import ItemImageGenToolCall
item = ItemImageGenToolCall({"type": "image_generation_call", "id": "ig-1", "status": "completed"})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "assistant"
assert msg.contents[0].type == "image_generation_tool_call"
async def test_shell_call(self) -> None:
from azure.ai.agentserver.responses.models import FunctionShellAction, FunctionShellCallItemParam
item = FunctionShellCallItemParam({
"type": "shell_call",
"call_id": "call_sc",
"action": FunctionShellAction({"commands": ["ls", "-la"], "timeout_ms": 5000, "max_output_length": 1024}),
"status": "in_progress",
})
msg = await _item_to_message(item) # type: ignore[arg-type]
assert msg is not None
assert msg.role == "assistant"
assert msg.contents[0].type == "shell_tool_call"
assert msg.contents[0].commands == ["ls", "-la"]
assert msg.contents[0].call_id == "call_sc"
async def test_shell_call_output(self) -> None:
from azure.ai.agentserver.responses.models import (
FunctionShellCallOutputContent,
FunctionShellCallOutputExitOutcome,
FunctionShellCallOutputItemParam,
)
item = FunctionShellCallOutputItemParam({
"type": "shell_call_output",
"call_id": "call_sc",
"output": [
FunctionShellCallOutputContent({
"stdout": "file.txt",
"stderr": "",
"outcome": FunctionShellCallOutputExitOutcome({"exit_code": 0}),
})
],
"max_output_length": 1024,
})
msg = await _item_to_message(item) # type: ignore[arg-type]
assert msg is not None
assert msg.role == "tool"
assert msg.contents[0].type == "shell_tool_result"
assert msg.contents[0].call_id == "call_sc"
async def test_local_shell_call(self) -> None:
from azure.ai.agentserver.responses.models import ItemLocalShellToolCall, LocalShellExecAction
item = ItemLocalShellToolCall({
"type": "local_shell_call",
"id": "lsc-1",
"call_id": "call_lsc",
"action": LocalShellExecAction({"type": "exec", "command": ["echo", "hello"], "env": {}}),
"status": "completed",
})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "assistant"
assert msg.contents[0].type == "shell_tool_call"
assert msg.contents[0].commands == ["echo", "hello"]
async def test_local_shell_call_output(self) -> None:
from azure.ai.agentserver.responses.models import ItemLocalShellToolCallOutput
item = ItemLocalShellToolCallOutput({
"type": "local_shell_call_output",
"id": "lsco-1",
"output": "hello\n",
})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "tool"
assert msg.contents[0].type == "shell_tool_result"
async def test_file_search_call(self) -> None:
from azure.ai.agentserver.responses.models import ItemFileSearchToolCall
item = ItemFileSearchToolCall({
"type": "file_search_call",
"id": "fs-1",
"status": "completed",
"queries": ["what is AI"],
})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "assistant"
assert msg.contents[0].type == "function_call"
assert msg.contents[0].name == "file_search"
assert '"what is AI"' in (msg.contents[0].arguments or "")
async def test_web_search_call(self) -> None:
from azure.ai.agentserver.responses.models import ItemWebSearchToolCall
item = ItemWebSearchToolCall({
"type": "web_search_call",
"id": "ws-1",
"status": "completed",
})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "assistant"
assert msg.contents[0].type == "function_call"
assert msg.contents[0].name == "web_search"
async def test_computer_call(self) -> None:
from azure.ai.agentserver.responses.models import ComputerAction, ItemComputerToolCall
item = ItemComputerToolCall({
"type": "computer_call",
"id": "cc-1",
"call_id": "call_cc",
"action": ComputerAction({"type": "click"}),
"pending_safety_checks": [],
"status": "completed",
})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "assistant"
assert msg.contents[0].type == "function_call"
assert msg.contents[0].name == "computer_use"
async def test_computer_call_output(self) -> None:
from azure.ai.agentserver.responses.models import ComputerCallOutputItemParam, ComputerScreenshotImage
item = ComputerCallOutputItemParam({
"type": "computer_call_output",
"call_id": "call_cc",
"output": ComputerScreenshotImage({
"type": "computer_screenshot",
"image_url": "data:image/png;base64,abc",
}),
})
msg = await _item_to_message(item) # type: ignore[arg-type]
assert msg is not None
assert msg.role == "tool"
assert msg.contents[0].type == "function_result"
assert msg.contents[0].call_id == "call_cc"
async def test_custom_tool_call(self) -> None:
from azure.ai.agentserver.responses.models import ItemCustomToolCall
item = ItemCustomToolCall({
"type": "custom_tool_call",
"call_id": "call_ct",
"name": "my_tool",
"input": '{"key": "value"}',
})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "assistant"
assert msg.contents[0].type == "function_call"
assert msg.contents[0].name == "my_tool"
assert msg.contents[0].arguments == '{"key": "value"}'
async def test_custom_tool_call_output(self) -> None:
from azure.ai.agentserver.responses.models import ItemCustomToolCallOutput
item = ItemCustomToolCallOutput({
"type": "custom_tool_call_output",
"call_id": "call_ct",
"output": "result text",
})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "tool"
assert msg.contents[0].type == "function_result"
assert msg.contents[0].result == "result text"
async def test_custom_tool_call_output_non_string(self) -> None:
from azure.ai.agentserver.responses.models import ItemCustomToolCallOutput
item = ItemCustomToolCallOutput({
"type": "custom_tool_call_output",
"call_id": "call_ct2",
"output": 123,
})
msg = await _item_to_message(item)
assert msg is not None
assert msg.contents[0].result == "123"
async def test_custom_tool_call_output_with_mcp_call_id_routes_to_mcp_server_tool_result(self) -> None:
"""Issue #5546: input items carrying a hosted-MCP result (from a
prior turn that the framework wrote via
`aoutput_item_custom_tool_call_output`) must reconstruct as a
`mcp_server_tool_result` Content, not `function_result`. Otherwise
the chat-client serialize layer turns it into an orphan
`function_call_output` with `mcp_*` call_id and the Responses API
rejects the next turn.
"""
from azure.ai.agentserver.responses.models import ItemCustomToolCallOutput
item = ItemCustomToolCallOutput({
"type": "custom_tool_call_output",
"call_id": "mcp_06b686e11f118cf40169f0e5badb3081979842929d5cf04920",
"output": "found 10 cats",
})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "tool"
assert len(msg.contents) == 1
c = msg.contents[0]
assert c.type == "mcp_server_tool_result", (
f"expected mcp_server_tool_result for mcp_-prefixed call_id; got {c.type}"
)
assert c.call_id == "mcp_06b686e11f118cf40169f0e5badb3081979842929d5cf04920"
async def test_apply_patch_call(self) -> None:
from azure.ai.agentserver.responses.models import ApplyPatchToolCallItemParam, ApplyPatchUpdateFileOperation
item = ApplyPatchToolCallItemParam({
"type": "apply_patch_call",
"call_id": "call_ap",
"operation": ApplyPatchUpdateFileOperation({
"type": "update_file",
"path": "file.py",
"diff": "+ new line",
}),
})
msg = await _item_to_message(item) # type: ignore[arg-type]
assert msg is not None
assert msg.role == "assistant"
assert msg.contents[0].type == "function_call"
assert msg.contents[0].name == "apply_patch"
async def test_apply_patch_call_output(self) -> None:
from azure.ai.agentserver.responses.models import ApplyPatchToolCallOutputItemParam
item = ApplyPatchToolCallOutputItemParam({
"type": "apply_patch_call_output",
"call_id": "call_ap",
"output": "patch applied",
})
msg = await _item_to_message(item) # type: ignore[arg-type]
assert msg is not None
assert msg.role == "tool"
assert msg.contents[0].type == "function_result"
assert msg.contents[0].result == "patch applied"
async def test_unsupported_type_raises(self) -> None:
from azure.ai.agentserver.responses.models import Item
item = Item({"type": "some_unknown_type"})
with pytest.raises(ValueError, match="Unsupported Item type: some_unknown_type"):
await _item_to_message(item)
# endregion
# region Multi-turn with mixed content
async def _post_json(
server: ResponsesHostServer,
payload: dict[str, Any],
) -> httpx.Response:
"""Send a POST /responses request with a raw JSON payload."""
transport = httpx.ASGITransport(app=server)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
return await client.post("/responses", json=payload)
def _make_multi_response_agent(
responses: list[AgentResponse],
stream_updates_list: list[list[AgentResponseUpdate]] | None = None,
) -> MagicMock:
"""Create a mock agent that returns different responses on successive calls."""
agent = MagicMock(spec=RawAgent)
agent.id = "test-agent"
agent.name = "Test Agent"
agent.description = "A mock agent for testing"
agent.context_providers = []
call_index = [0]
async def run_non_streaming(*args: Any, **kwargs: Any) -> AgentResponse:
idx = call_index[0]
call_index[0] += 1
return responses[idx]
async def _stream_gen(updates: list[AgentResponseUpdate]) -> AsyncIterator[AgentResponseUpdate]:
for update in updates:
yield update
def run_dispatch(*args: Any, **kwargs: Any) -> Any:
idx = call_index[0]
call_index[0] += 1
if kwargs.get("stream") and stream_updates_list is not None:
return ResponseStream(_stream_gen(stream_updates_list[idx])) # type: ignore
if not kwargs.get("stream"):
# Need to return a coroutine for non-streaming
async def _ret() -> AgentResponse:
return responses[idx]
return _ret()
raise NotImplementedError("Streaming not configured for this call index")
if stream_updates_list is not None:
agent.run = MagicMock(side_effect=run_dispatch)
else:
agent.run = AsyncMock(side_effect=run_non_streaming)
return agent
class TestMultiTurnMixedContent:
"""End-to-end multi-turn tests with mixed text and non-text content types."""
async def test_text_and_image_input_single_turn(self) -> None:
"""Agent receives a message with text and image content via URL."""
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("I see a cat!")])])
)
server = _make_server(agent)
resp = await _post_json(
server,
{
"model": "test-model",
"input": [
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "Describe this animal"},
{"type": "input_image", "image_url": "https://example.com/cat.jpg"},
],
}
],
"stream": False,
},
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
# Verify agent received text + image
messages = agent.run.call_args.kwargs["messages"]
assert len(messages) == 1
assert messages[0].role == "user"
assert len(messages[0].contents) == 2
assert messages[0].contents[0].type == "text"
assert messages[0].contents[0].text == "Describe this animal"
assert messages[0].contents[1].type == "uri"
assert messages[0].contents[1].uri == "https://example.com/cat.jpg"
async def test_text_and_file_input_single_turn(self) -> None:
"""Agent receives a message with text and file content via URL."""
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("File received")])])
)
server = _make_server(agent)
resp = await _post_json(
server,
{
"model": "test-model",
"input": [
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "Summarize this document"},
{"type": "input_file", "file_url": "https://example.com/doc.pdf", "filename": "doc.pdf"},
],
}
],
"stream": False,
},
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
messages = agent.run.call_args.kwargs["messages"]
assert len(messages) == 1
assert len(messages[0].contents) == 2
assert messages[0].contents[0].type == "text"
assert messages[0].contents[0].text == "Summarize this document"
assert messages[0].contents[1].type == "uri"
assert messages[0].contents[1].uri == "https://example.com/doc.pdf"
async def test_text_and_file_data_input_single_turn(self) -> None:
"""Agent receives a message with text and file content via inline file_data."""
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("File received")])])
)
server = _make_server(agent)
resp = await _post_json(
server,
{
"model": "test-model",
"input": [
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "Summarize this document"},
{
"type": "input_file",
"file_data": "data:application/pdf;base64,JVBERi0xLjQ=",
"filename": "doc.pdf",
},
],
}
],
"stream": False,
},
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
messages = agent.run.call_args.kwargs["messages"]
assert len(messages) == 1
assert len(messages[0].contents) == 2
assert messages[0].contents[0].type == "text"
assert messages[0].contents[0].text == "Summarize this document"
assert messages[0].contents[1].type == "data"
assert messages[0].contents[1].uri == "data:application/pdf;base64,JVBERi0xLjQ="
async def test_text_mime_file_data_decoded(self) -> None:
"""Agent receives a text/* file_data that is base64-decoded to plain text."""
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("Got it")])])
)
server = _make_server(agent)
import base64
encoded = base64.b64encode(b"Hello, world!").decode()
resp = await _post_json(
server,
{
"model": "test-model",
"input": [
{
"type": "message",
"role": "user",
"content": [
{
"type": "input_file",
"file_data": f"data:text/plain;base64,{encoded}",
"filename": "greeting.txt",
},
],
}
],
"stream": False,
},
)
assert resp.status_code == 200
messages = agent.run.call_args.kwargs["messages"]
assert len(messages) == 1
assert messages[0].contents[0].type == "text"
assert messages[0].contents[0].text == "[File: greeting.txt]\nHello, world!"
async def test_text_mime_file_data_invalid_base64_falls_through(self) -> None:
"""Invalid base64 in a text/* file_data falls through to URI passthrough."""
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("Got it")])])
)
server = _make_server(agent)
resp = await _post_json(
server,
{
"model": "test-model",
"input": [
{
"type": "message",
"role": "user",
"content": [
{
"type": "input_file",
"file_data": "data:text/plain;base64,!!!invalid!!!",
"filename": "bad.txt",
},
],
}
],
"stream": False,
},
)
assert resp.status_code == 200
messages = agent.run.call_args.kwargs["messages"]
assert len(messages) == 1
assert messages[0].contents[0].type == "data"
assert messages[0].contents[0].uri == "data:text/plain;base64,!!!invalid!!!"
async def test_mixed_text_and_image_input(self) -> None:
"""Agent receives a single message with both text and image content."""
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("Got it!")])])
)
server = _make_server(agent)
resp = await _post_json(
server,
{
"model": "test-model",
"input": [
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "What's in this image?"},
{"type": "input_image", "image_url": "https://example.com/photo.jpg"},
],
}
],
"stream": False,
},
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
messages = agent.run.call_args.kwargs["messages"]
assert len(messages) == 1
assert len(messages[0].contents) == 2
assert messages[0].contents[0].type == "text"
assert messages[0].contents[0].text == "What's in this image?"
assert messages[0].contents[1].type == "uri"
assert messages[0].contents[1].uri == "https://example.com/photo.jpg"
async def test_function_call_items_in_input(self) -> None:
"""Input contains function_call and function_call_output items."""
agent = _make_agent(
response=AgentResponse(
messages=[Message(role="assistant", contents=[Content.from_text("Weather is sunny!")])]
)
)
server = _make_server(agent)
resp = await _post_json(
server,
{
"model": "test-model",
"input": [
{"type": "message", "role": "user", "content": "What's the weather?"},
{
"type": "function_call",
"id": "fc-1",
"call_id": "call_1",
"name": "get_weather",
"arguments": '{"city": "NYC"}',
"status": "completed",
},
{"type": "function_call_output", "call_id": "call_1", "output": "sunny, 72F"},
],
"stream": False,
},
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
messages = agent.run.call_args.kwargs["messages"]
assert len(messages) == 3
assert messages[0].role == "user"
assert messages[0].contents[0].type == "text"
assert messages[1].role == "assistant"
assert messages[1].contents[0].type == "function_call"
assert messages[1].contents[0].name == "get_weather"
assert messages[2].role == "tool"
assert messages[2].contents[0].type == "function_result"
assert messages[2].contents[0].result == "sunny, 72F"
async def test_multi_turn_text_then_text_with_image(self) -> None:
"""First turn sends text, second turn sends text + image with previous_response_id."""
agent = _make_multi_response_agent([
AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("Send me an image")])]),
AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("Nice cat!")])]),
])
server = _make_server(agent)
# Turn 1: simple text
resp1 = await _post(server, input_text="Hello", stream=False)
assert resp1.status_code == 200
response_id = resp1.json()["id"]
# Turn 2: text + image input referencing turn 1
resp2 = await _post_json(
server,
{
"model": "test-model",
"input": [
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "Here is my cat photo"},
{"type": "input_image", "image_url": "https://example.com/cat.jpg"},
],
}
],
"stream": False,
"previous_response_id": response_id,
},
)
assert resp2.status_code == 200
body2 = resp2.json()
assert body2["status"] == "completed"
# Verify second call receives history from turn 1 + text+image input
second_call_messages = agent.run.call_args_list[1].kwargs["messages"]
# History: output message from turn 1 ("Send me an image")
# Input: message with text + image
assert len(second_call_messages) >= 2
# Last message should be the text+image input
last_msg = second_call_messages[-1]
assert last_msg.role == "user"
assert len(last_msg.contents) == 2
assert last_msg.contents[0].type == "text"
assert last_msg.contents[0].text == "Here is my cat photo"
assert last_msg.contents[1].type == "uri"
assert last_msg.contents[1].uri == "https://example.com/cat.jpg"
# History should include the assistant response from turn 1
history_msgs = second_call_messages[:-1]
assistant_texts = [
c.text for m in history_msgs if m.role == "assistant" for c in m.contents if c.type == "text"
]
assert "Send me an image" in assistant_texts
async def test_multi_turn_function_call_in_history(self) -> None:
"""Turn 1 produces function call + result, turn 2 sees them in history."""
agent = _make_multi_response_agent([
AgentResponse(
messages=[
Message(
role="assistant",
contents=[Content.from_function_call("call_1", "search", arguments='{"q": "cats"}')],
),
Message(role="tool", contents=[Content.from_function_result("call_1", result="found 10 cats")]),
Message(role="assistant", contents=[Content.from_text("I found 10 cats!")]),
]
),
AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("Here are more details")])]),
])
server = _make_server(agent)
# Turn 1
resp1 = await _post(server, input_text="Search for cats", stream=False)
assert resp1.status_code == 200
response_id = resp1.json()["id"]
# Verify turn 1 output has function_call, function_call_output, and message
types1 = [item["type"] for item in resp1.json()["output"]]
assert "function_call" in types1
assert "function_call_output" in types1
assert "message" in types1
# Turn 2
resp2 = await _post_json(
server,
{
"model": "test-model",
"input": "Tell me more",
"stream": False,
"previous_response_id": response_id,
},
)
assert resp2.status_code == 200
assert resp2.json()["status"] == "completed"
# Verify turn 2 received history including function call/result
second_call_messages = agent.run.call_args_list[1].kwargs["messages"]
roles = [m.role for m in second_call_messages]
assert "assistant" in roles
assert "tool" in roles
# The function call should be in the history
fc_contents = [
c for m in second_call_messages if m.role == "assistant" for c in m.contents if c.type == "function_call"
]
assert len(fc_contents) >= 1
assert fc_contents[0].name == "search"
async def test_hosted_mcp_call_round_trip_does_not_orphan_function_call_output(self) -> None:
"""Turn 1 produces hosted MCP call + result, turn 2 must replay both without orphaning output."""
agent = _make_multi_response_agent([
AgentResponse(
messages=[
Message(
role="assistant",
contents=[
Content.from_mcp_server_tool_call(
call_id="mcp_abc123",
tool_name="search",
server_name="api_specs",
arguments='{"q": "cats"}',
)
],
),
Message(
role="tool",
contents=[
Content.from_mcp_server_tool_result(
call_id="mcp_abc123",
output=[Content.from_text(text="found 10 cats")],
)
],
),
Message(role="assistant", contents=[Content.from_text("I found 10 cats!")]),
]
),
AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("Here are more details")])]),
])
server = _make_server(agent)
resp1 = await _post(server, input_text="Search for cats", stream=False)
assert resp1.status_code == 200
response_id = resp1.json()["id"]
types1 = [item["type"] for item in resp1.json()["output"]]
assert "mcp_call" in types1
assert "custom_tool_call_output" not in types1
resp2 = await _post_json(
server,
{
"model": "test-model",
"input": "Tell me more",
"stream": False,
"previous_response_id": response_id,
},
)
assert resp2.status_code == 200
assert resp2.json()["status"] == "completed"
second_call_messages = agent.run.call_args_list[1].kwargs["messages"]
mcp_call_contents = [c for m in second_call_messages for c in m.contents if c.type == "mcp_server_tool_call"]
mcp_result_contents = [
c for m in second_call_messages for c in m.contents if c.type == "mcp_server_tool_result"
]
function_result_contents = [c for m in second_call_messages for c in m.contents if c.type == "function_result"]
assert len(mcp_call_contents) >= 1
assert len(mcp_result_contents) >= 1
assert all((c.call_id or "") != "mcp_abc123" for c in function_result_contents)
assert any((c.call_id or "") == "mcp_abc123" for c in mcp_call_contents)
assert any((c.call_id or "") == "mcp_abc123" for c in mcp_result_contents)
async def test_multi_turn_reasoning_in_history(self) -> None:
"""Turn 1 produces reasoning + text, turn 2 sees them in history."""
agent = _make_multi_response_agent([
AgentResponse(
messages=[
Message(
role="assistant",
contents=[
Content.from_text_reasoning(text="Let me think about this..."),
Content.from_text("The answer is 42"),
],
),
]
),
AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("Indeed, it is 42")])]),
])
server = _make_server(agent)
# Turn 1
resp1 = await _post(server, input_text="What is the answer?", stream=False)
assert resp1.status_code == 200
response_id = resp1.json()["id"]
types1 = [item["type"] for item in resp1.json()["output"]]
assert "reasoning" in types1
assert "message" in types1
# Turn 2
resp2 = await _post_json(
server,
{
"model": "test-model",
"input": "Are you sure?",
"stream": False,
"previous_response_id": response_id,
},
)
assert resp2.status_code == 200
assert resp2.json()["status"] == "completed"
# Verify history includes the reasoning and text from turn 1
second_call_messages = agent.run.call_args_list[1].kwargs["messages"]
assert len(second_call_messages) >= 2 # history + new input
async def test_multi_turn_with_mixed_content_and_streaming(self) -> None:
"""Turn 1 non-streaming, turn 2 streaming with image input."""
turn2_updates = [
AgentResponseUpdate(contents=[Content.from_text("I see ")], role="assistant"),
AgentResponseUpdate(contents=[Content.from_text("a cat!")], role="assistant"),
]
agent = _make_multi_response_agent(
responses=[
AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("Send me an image")])]),
AgentResponse(messages=[]), # placeholder, not used for streaming
],
stream_updates_list=[
[], # placeholder for turn 1 (non-streaming)
turn2_updates,
],
)
server = _make_server(agent)
# Turn 1: non-streaming text
resp1 = await _post(server, input_text="Hello", stream=False)
assert resp1.status_code == 200
response_id = resp1.json()["id"]
# Turn 2: streaming with image input
resp2 = await _post_json(
server,
{
"model": "test-model",
"input": [
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "Describe this:"},
{"type": "input_image", "image_url": "https://example.com/cat.jpg"},
],
}
],
"stream": True,
"previous_response_id": response_id,
},
)
assert resp2.status_code == 200
assert "text/event-stream" in resp2.headers["content-type"]
events = _parse_sse_events(resp2.text)
types = _sse_event_types(events)
assert types[0] == "response.created"
assert types[-1] == "response.completed"
assert "response.output_text.delta" in types
# Verify accumulated text
text_done = [e for e in events if e["event"] == "response.output_text.done"]
assert len(text_done) == 1
assert text_done[0]["data"]["text"] == "I see a cat!"
async def test_text_with_mcp_call_items(self) -> None:
"""Input contains text message + mcp_call item and the agent processes it."""
agent = _make_agent(
response=AgentResponse(
messages=[Message(role="assistant", contents=[Content.from_text("MCP result received")])]
)
)
server = _make_server(agent)
resp = await _post_json(
server,
{
"model": "test-model",
"input": [
{"type": "message", "role": "user", "content": "Search using MCP"},
{
"type": "mcp_call",
"id": "mcp-1",
"server_label": "my_server",
"name": "search",
"arguments": '{"query": "test"}',
},
],
"stream": False,
},
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
messages = agent.run.call_args.kwargs["messages"]
assert len(messages) == 2
assert messages[0].role == "user"
assert messages[0].contents[0].type == "text"
assert messages[0].contents[0].text == "Search using MCP"
assert messages[1].role == "assistant"
assert messages[1].contents[0].type == "mcp_server_tool_call"
assert messages[1].contents[0].server_name == "my_server"
assert messages[1].contents[0].tool_name == "search"
async def test_three_turn_conversation_with_mixed_content(self) -> None:
"""Three-turn conversation: text → function call → image input."""
agent = _make_multi_response_agent([
AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("Hello! How can I help?")])]),
AgentResponse(
messages=[
Message(
role="assistant",
contents=[Content.from_function_call("call_1", "analyze", arguments='{"mode": "deep"}')],
),
Message(role="tool", contents=[Content.from_function_result("call_1", result="analysis complete")]),
Message(role="assistant", contents=[Content.from_text("Analysis done!")]),
]
),
AgentResponse(
messages=[Message(role="assistant", contents=[Content.from_text("The image shows a chart")])]
),
])
server = _make_server(agent)
# Turn 1: text
resp1 = await _post(server, input_text="Hi", stream=False)
assert resp1.status_code == 200
id1 = resp1.json()["id"]
# Turn 2: text, referencing turn 1
resp2 = await _post_json(
server,
{
"model": "test-model",
"input": "Analyze something",
"stream": False,
"previous_response_id": id1,
},
)
assert resp2.status_code == 200
id2 = resp2.json()["id"]
# Turn 3: image input, referencing turn 2
resp3 = await _post_json(
server,
{
"model": "test-model",
"input": [
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "What about this image?"},
{"type": "input_image", "image_url": "https://example.com/chart.png"},
],
}
],
"stream": False,
"previous_response_id": id2,
},
)
assert resp3.status_code == 200
assert resp3.json()["status"] == "completed"
# Verify turn 3 received full history from turns 1+2 plus new image input
third_call_messages = agent.run.call_args_list[2].kwargs["messages"]
# Should have: history from turn 1 (assistant text) + history from turn 2
# (function_call, function_call_output, text) + new input (text + image)
assert len(third_call_messages) >= 5
# Last message should contain the image
last_msg = third_call_messages[-1]
assert last_msg.role == "user"
image_contents = [c for c in last_msg.contents if c.type == "uri"]
assert len(image_contents) == 1
assert image_contents[0].uri == "https://example.com/chart.png"
# History should include function call from turn 2
fc_contents = [
c
for m in third_call_messages[:-1]
if m.role == "assistant"
for c in m.contents
if c.type == "function_call"
]
assert any(c.name == "analyze" for c in fc_contents)
async def test_input_with_hosted_file_image(self) -> None:
"""Input contains an image referenced by file_id (hosted file)."""
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("Image analyzed")])])
)
server = _make_server(agent)
resp = await _post_json(
server,
{
"model": "test-model",
"input": [
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "Analyze this image"},
{"type": "input_image", "file_id": "file-abc123"},
],
}
],
"stream": False,
},
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
messages = agent.run.call_args.kwargs["messages"]
assert len(messages) == 1
assert len(messages[0].contents) == 2
assert messages[0].contents[0].type == "text"
assert messages[0].contents[0].text == "Analyze this image"
assert messages[0].contents[1].type == "hosted_file"
assert messages[0].contents[1].file_id == "file-abc123"
async def test_multi_turn_text_and_image_then_text_and_file(self) -> None:
"""Turn 1 sends text+image, turn 2 sends text+file, both in history."""
agent = _make_multi_response_agent([
AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("I see a landscape")])]),
AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("Document summarized")])]),
])
server = _make_server(agent)
# Turn 1: text + image
resp1 = await _post_json(
server,
{
"model": "test-model",
"input": [
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "What is in this photo?"},
{"type": "input_image", "image_url": "https://example.com/landscape.jpg"},
],
}
],
"stream": False,
},
)
assert resp1.status_code == 200
id1 = resp1.json()["id"]
# Turn 2: text + file, referencing turn 1
resp2 = await _post_json(
server,
{
"model": "test-model",
"input": [
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "Now summarize this report"},
{
"type": "input_file",
"file_url": "https://example.com/report.pdf",
"filename": "report.pdf",
},
],
}
],
"stream": False,
"previous_response_id": id1,
},
)
assert resp2.status_code == 200
assert resp2.json()["status"] == "completed"
# Verify turn 2 received history from turn 1 + new text+file input
second_call_messages = agent.run.call_args_list[1].kwargs["messages"]
assert len(second_call_messages) >= 2
# History should include the assistant response from turn 1
assistant_texts = [
c.text for m in second_call_messages if m.role == "assistant" for c in m.contents if c.type == "text"
]
assert "I see a landscape" in assistant_texts
# Last message should be text + file
last_msg = second_call_messages[-1]
assert last_msg.role == "user"
assert len(last_msg.contents) == 2
assert last_msg.contents[0].type == "text"
assert last_msg.contents[0].text == "Now summarize this report"
assert last_msg.contents[1].type == "uri"
assert last_msg.contents[1].uri == "https://example.com/report.pdf"
async def test_multi_turn_function_call_then_text_and_image(self) -> None:
"""Turn 1: text + function call + result, turn 2: text + image."""
agent = _make_multi_response_agent([
AgentResponse(
messages=[
Message(
role="assistant",
contents=[Content.from_function_call("call_1", "get_info", arguments='{"id": 1}')],
),
Message(role="tool", contents=[Content.from_function_result("call_1", result="info data")]),
Message(role="assistant", contents=[Content.from_text("Here is the info")]),
]
),
AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("Image matches the data")])]),
])
server = _make_server(agent)
# Turn 1: text triggers function call
resp1 = await _post(server, input_text="Get info for item 1", stream=False)
assert resp1.status_code == 200
id1 = resp1.json()["id"]
types1 = [item["type"] for item in resp1.json()["output"]]
assert "function_call" in types1
assert "function_call_output" in types1
assert "message" in types1
# Turn 2: text + image referencing turn 1
resp2 = await _post_json(
server,
{
"model": "test-model",
"input": [
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "Does this image match?"},
{"type": "input_image", "image_url": "https://example.com/item1.jpg"},
],
}
],
"stream": False,
"previous_response_id": id1,
},
)
assert resp2.status_code == 200
assert resp2.json()["status"] == "completed"
# Verify turn 2 received history with function call + new text+image
second_call_messages = agent.run.call_args_list[1].kwargs["messages"]
# History should contain function_call and function_result from turn 1
fc_contents = [
c for m in second_call_messages if m.role == "assistant" for c in m.contents if c.type == "function_call"
]
assert any(c.name == "get_info" for c in fc_contents)
tool_contents = [
c for m in second_call_messages if m.role == "tool" for c in m.contents if c.type == "function_result"
]
assert any(c.result == "info data" for c in tool_contents)
# Last message should be text + image
last_msg = second_call_messages[-1]
assert last_msg.role == "user"
assert len(last_msg.contents) == 2
assert last_msg.contents[0].type == "text"
assert last_msg.contents[0].text == "Does this image match?"
assert last_msg.contents[1].type == "uri"
assert last_msg.contents[1].uri == "https://example.com/item1.jpg"
# endregion
# region Function approval round-trip
class TestFunctionApprovalStorage:
"""Unit tests for the function approval storage classes."""
async def test_in_memory_save_and_load(self) -> None:
storage = InMemoryFunctionApprovalStorage()
request = _make_function_approval_request_content(request_id="apr_1")
await storage.save_approval_request("apr_1", request)
loaded = await storage.load_approval_request("apr_1")
assert loaded.type == "function_approval_request"
assert loaded.id == "apr_1" # type: ignore[attr-defined]
async def test_in_memory_duplicate_save_raises(self) -> None:
storage = InMemoryFunctionApprovalStorage()
request = _make_function_approval_request_content(request_id="apr_1")
await storage.save_approval_request("apr_1", request)
with pytest.raises(ValueError, match="already exists"):
await storage.save_approval_request("apr_1", request)
async def test_in_memory_missing_load_raises(self) -> None:
storage = InMemoryFunctionApprovalStorage()
with pytest.raises(KeyError):
await storage.load_approval_request("missing")
async def test_file_based_save_and_load_persists_across_instances(self, tmp_path: Any) -> None:
path = tmp_path / "subdir" / "approvals.json"
storage = FileBasedFunctionApprovalStorage(str(path))
request = _make_function_approval_request_content(request_id="apr_1")
await storage.save_approval_request("apr_1", request)
# Directory + file should now exist.
assert path.exists()
# A new instance pointing at the same path can load the saved entry.
storage2 = FileBasedFunctionApprovalStorage(str(path))
loaded = await storage2.load_approval_request("apr_1")
assert loaded.type == "function_approval_request"
assert loaded.id == "apr_1" # type: ignore[attr-defined]
# The embedded function_call survives the round trip.
assert loaded.function_call.name == "delete_file" # type: ignore[attr-defined]
async def test_file_based_duplicate_save_raises(self, tmp_path: Any) -> None:
path = tmp_path / "approvals.json"
storage = FileBasedFunctionApprovalStorage(str(path))
request = _make_function_approval_request_content(request_id="apr_1")
await storage.save_approval_request("apr_1", request)
with pytest.raises(ValueError, match="already exists"):
await storage.save_approval_request("apr_1", request)
async def test_file_based_missing_load_raises(self, tmp_path: Any) -> None:
path = tmp_path / "approvals.json"
storage = FileBasedFunctionApprovalStorage(str(path))
with pytest.raises(KeyError):
await storage.load_approval_request("missing")
class TestFunctionApprovalConversion:
"""Tests for the approval-aware paths in `_item_to_message` / `_output_item_to_message`."""
async def test_output_item_mcp_approval_request_loads_from_storage(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemMcpApprovalRequest
storage = InMemoryFunctionApprovalStorage()
saved = _make_function_approval_request_content(request_id="apr-1")
await storage.save_approval_request("apr-1", saved)
item = OutputItemMcpApprovalRequest({
"type": "mcp_approval_request",
"id": "apr-1",
"server_label": "srv",
"name": "dangerous_tool",
"arguments": "{}",
})
msg = await _output_item_to_message(item, approval_storage=storage)
assert msg.role == "assistant"
c = msg.contents[0]
assert c.type == "function_approval_request"
assert c.id == "apr-1" # type: ignore[attr-defined]
# The full saved Content (incl. function_call) is restored.
assert c.function_call.name == "delete_file" # type: ignore[attr-defined]
async def test_output_item_mcp_approval_request_without_storage_raises(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemMcpApprovalRequest
item = OutputItemMcpApprovalRequest({
"type": "mcp_approval_request",
"id": "apr-1",
"server_label": "srv",
"name": "dangerous_tool",
"arguments": "{}",
})
with pytest.raises(ValueError, match="ApprovalStorage is required"):
await _output_item_to_message(item)
async def test_output_item_mcp_approval_response_resolves_to_approval_response(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemMcpApprovalResponseResource
storage = InMemoryFunctionApprovalStorage()
saved = _make_function_approval_request_content(request_id="apr-1")
await storage.save_approval_request("apr-1", saved)
item = OutputItemMcpApprovalResponseResource({
"type": "mcp_approval_response",
"id": "resp-1",
"approval_request_id": "apr-1",
"approve": True,
})
msg = await _output_item_to_message(item, approval_storage=storage)
assert msg.role == "user"
c = msg.contents[0]
assert c.type == "function_approval_response"
assert c.approved is True # type: ignore[attr-defined]
assert c.id == "apr-1" # type: ignore[attr-defined]
assert c.function_call.name == "delete_file" # type: ignore[attr-defined]
async def test_output_item_mcp_approval_response_without_storage_raises(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemMcpApprovalResponseResource
item = OutputItemMcpApprovalResponseResource({
"type": "mcp_approval_response",
"id": "resp-1",
"approval_request_id": "apr-1",
"approve": False,
})
with pytest.raises(ValueError, match="ApprovalStorage is required"):
await _output_item_to_message(item)
async def test_input_item_mcp_approval_request_loads_from_storage(self) -> None:
from azure.ai.agentserver.responses.models import ItemMcpApprovalRequest
storage = InMemoryFunctionApprovalStorage()
saved = _make_function_approval_request_content(request_id="apr-1")
await storage.save_approval_request("apr-1", saved)
item = ItemMcpApprovalRequest({
"type": "mcp_approval_request",
"id": "apr-1",
"server_label": "srv",
"name": "dangerous_tool",
"arguments": "{}",
})
msg = await _item_to_message(item, approval_storage=storage)
assert msg.role == "assistant"
assert msg.contents[0].type == "function_approval_request"
assert msg.contents[0].id == "apr-1" # type: ignore[attr-defined]
async def test_input_item_mcp_approval_response_resolves_to_approval_response(self) -> None:
from azure.ai.agentserver.responses.models import MCPApprovalResponse
storage = InMemoryFunctionApprovalStorage()
saved = _make_function_approval_request_content(request_id="apr-1")
await storage.save_approval_request("apr-1", saved)
item = MCPApprovalResponse({
"type": "mcp_approval_response",
"approval_request_id": "apr-1",
"approve": False,
})
msg = await _item_to_message(item, approval_storage=storage) # type: ignore[arg-type]
assert msg.role == "user"
c = msg.contents[0]
assert c.type == "function_approval_response"
assert c.approved is False # type: ignore[attr-defined]
class TestFunctionApprovalRoundTrip:
"""End-to-end round-trip tests for the function approval flow.
Turn 1: the agent emits a `function_approval_request` content; the
server emits an `mcp_approval_request` output item and persists
the original Content under the emitted id in approval storage.
Turn 2: the caller sends an `mcp_approval_response` input item back;
the server resolves it (via approval storage) into a
`function_approval_response` content delivered to the agent.
"""
async def test_non_streaming_emits_mcp_approval_request_and_persists_to_storage(self) -> None:
request_content = _make_function_approval_request_content()
agent = _make_agent(response=AgentResponse(messages=[Message(role="assistant", contents=[request_content])]))
server = _make_server(agent)
resp = await _post(server, stream=False)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
approval_items = [item for item in body["output"] if item["type"] == "mcp_approval_request"]
assert len(approval_items) == 1
approval_request_id = approval_items[0]["id"]
assert approval_items[0]["name"] == "delete_file"
assert approval_items[0]["server_label"] == "my_server"
# Storage must contain a saved entry under the emitted request id.
loaded = await server._approval_storage.load_approval_request( # pyright: ignore[reportPrivateUsage]
approval_request_id
)
assert loaded.type == "function_approval_request"
assert loaded.function_call.name == "delete_file" # type: ignore[attr-defined]
async def test_streaming_emits_mcp_approval_request_and_persists_to_storage(self) -> None:
request_content = _make_function_approval_request_content(request_id="apr_streaming")
agent = _make_agent(stream_updates=[AgentResponseUpdate(contents=[request_content], role="assistant")])
server = _make_server(agent)
resp = await _post(server, stream=True)
assert resp.status_code == 200
events = _parse_sse_events(resp.text)
types = _sse_event_types(events)
assert types[0] == "response.created"
assert types[-1] == "response.completed"
approval_request_id: str | None = None
for e in events:
if e["event"] != "response.output_item.added":
continue
item = e["data"].get("item") or {}
if item.get("type") == "mcp_approval_request":
approval_request_id = item.get("id")
break
assert approval_request_id is not None
loaded = await server._approval_storage.load_approval_request( # pyright: ignore[reportPrivateUsage]
approval_request_id
)
assert loaded.type == "function_approval_request"
async def test_round_trip_approval_response_reaches_agent(self) -> None:
"""Two-turn: turn 1 emits an approval request; turn 2 sends an
approval response and the agent receives a `function_approval_response`."""
request_content = _make_function_approval_request_content()
agent = _make_multi_response_agent(
responses=[
AgentResponse(messages=[Message(role="assistant", contents=[request_content])]),
AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("done")])]),
]
)
server = _make_server(agent)
first = await _post(server, stream=False)
assert first.status_code == 200
first_body = first.json()
approval_items = [item for item in first_body["output"] if item["type"] == "mcp_approval_request"]
assert len(approval_items) == 1
approval_request_id = approval_items[0]["id"]
# Send back an approval response that references the saved request id.
second_payload: dict[str, Any] = {
"model": "test-model",
"input": [
{
"type": "mcp_approval_response",
"approval_request_id": approval_request_id,
"approve": True,
}
],
"stream": False,
}
second = await _post_json(server, second_payload)
assert second.status_code == 200
# The agent's second invocation must have received a
# function_approval_response content carrying the original function_call.
assert agent.run.call_count == 2
second_call_kwargs = agent.run.call_args_list[1].kwargs
approval_responses = [
c for m in second_call_kwargs["messages"] for c in m.contents if c.type == "function_approval_response"
]
assert len(approval_responses) == 1
assert approval_responses[0].approved is True
assert approval_responses[0].function_call.name == "delete_file"
async def test_round_trip_approval_response_rejected(self) -> None:
"""Same as above but the user rejects the approval; the agent must
receive `approved=False`."""
request_content = _make_function_approval_request_content()
agent = _make_multi_response_agent(
responses=[
AgentResponse(messages=[Message(role="assistant", contents=[request_content])]),
AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("ok")])]),
]
)
server = _make_server(agent)
first = await _post(server, stream=False)
approval_request_id = next(
item["id"] for item in first.json()["output"] if item["type"] == "mcp_approval_request"
)
second = await _post_json(
server,
{
"model": "test-model",
"input": [
{
"type": "mcp_approval_response",
"approval_request_id": approval_request_id,
"approve": False,
}
],
"stream": False,
},
)
assert second.status_code == 200
second_call_kwargs = agent.run.call_args_list[1].kwargs
approval_responses = [
c for m in second_call_kwargs["messages"] for c in m.contents if c.type == "function_approval_response"
]
assert len(approval_responses) == 1
assert approval_responses[0].approved is False
async def test_approval_response_referencing_unknown_id_fails(self) -> None:
"""Sending an `mcp_approval_response` for a request id that was
never persisted must fail (storage raises KeyError)."""
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("ok")])])
)
server = _make_server(agent)
resp = await _post_json(
server,
{
"model": "test-model",
"input": [
{
"type": "mcp_approval_response",
"approval_request_id": "apr_unknown",
"approve": True,
}
],
"stream": False,
},
)
# The handler raises a KeyError when the storage lookup misses;
# the hosting layer surfaces this as a 5xx response.
assert resp.status_code >= 500
# endregion
# region Checkpoint context path validation
class TestCheckpointContextPathValidation:
"""Regression tests for the path-traversal hardening of checkpoint storage.
These tests guard against CWE-22 in the workflow hosting path. The hosting
code joins caller-supplied identifiers (``previous_response_id``) and
server-generated identifiers (``conversation_id`` / ``response_id``) under
the configured checkpoint root. Without validation, traversal segments
such as ``../../escape`` or absolute paths cause directory creation
outside the intended root.
"""
@staticmethod
def _helper() -> Callable[[str, str], FileCheckpointStorage]:
from agent_framework_foundry_hosting._responses import ( # pyright: ignore[reportPrivateUsage]
_checkpoint_storage_for_context,
)
return _checkpoint_storage_for_context
@staticmethod
def _checkpoint_with_azure_message_role() -> WorkflowCheckpoint:
from azure.ai.agentserver.responses.models import MessageRole
return WorkflowCheckpoint(
workflow_name="wf",
graph_signature_hash="hash",
messages={
"executor": [
WorkflowMessage(
data=Message(role=MessageRole.USER, contents=[Content.from_text("hello")]),
source_id="source",
)
]
},
)
def test_valid_segment_creates_storage_under_root(self, tmp_path: Any) -> None:
helper = self._helper()
root = tmp_path / "root"
root.mkdir()
storage = helper(str(root), "resp_abc123")
assert storage.storage_path.is_dir()
assert storage.storage_path.parent == root.resolve()
def test_azure_message_role_allowlist_type_matches_generated_sdk_path(self) -> None:
assert (
_AZURE_RESPONSES_MESSAGE_ROLE_TYPE
== "azure.ai.agentserver.responses.models._generated.sdk.models.models._enums:MessageRole"
)
async def test_storage_allows_azure_message_role_checkpoint_restore(self, tmp_path: Any) -> None:
from azure.ai.agentserver.responses.models import MessageRole
helper = self._helper()
root = tmp_path / "root"
root.mkdir()
storage = helper(str(root), "resp_abc123")
checkpoint = self._checkpoint_with_azure_message_role()
await storage.save(checkpoint)
loaded = await storage.load(checkpoint.checkpoint_id)
loaded_message = loaded.messages["executor"][0].data
assert isinstance(loaded_message, Message)
assert type(loaded_message.role) is MessageRole
assert loaded_message.role == MessageRole.USER
assert loaded_message.text == "hello"
async def test_plain_storage_blocks_azure_message_role_checkpoint_restore(self, tmp_path: Any) -> None:
storage = FileCheckpointStorage(tmp_path / "plain")
checkpoint = self._checkpoint_with_azure_message_role()
await storage.save(checkpoint)
with pytest.raises(WorkflowCheckpointException, match="MessageRole"):
await storage.load(checkpoint.checkpoint_id)
async def test_get_latest_restores_azure_message_role(self, tmp_path: Any) -> None:
from azure.ai.agentserver.responses.models import MessageRole
helper = self._helper()
root = tmp_path / "root"
root.mkdir()
storage = helper(str(root), "resp_abc123")
checkpoint = self._checkpoint_with_azure_message_role()
await storage.save(checkpoint)
latest = await storage.get_latest(workflow_name="wf")
assert latest is not None
assert latest.checkpoint_id == checkpoint.checkpoint_id
latest_message = latest.messages["executor"][0].data
assert isinstance(latest_message, Message)
assert type(latest_message.role) is MessageRole
async def test_get_latest_silently_skips_without_allowlist(
self, tmp_path: Any, caplog: pytest.LogCaptureFixture
) -> None:
import logging
storage = FileCheckpointStorage(tmp_path / "plain")
checkpoint = self._checkpoint_with_azure_message_role()
await storage.save(checkpoint)
with caplog.at_level(logging.WARNING, logger="agent_framework"):
latest = await storage.get_latest(workflow_name="wf")
assert latest is None
assert any("MessageRole" in message for message in caplog.messages)
async def test_handle_inner_workflow_restores_message_role_checkpoint_from_previous_response(
self, tmp_path: Any
) -> None:
from agent_framework import WorkflowAgent
from azure.ai.agentserver.responses import ResponseContext
from azure.ai.agentserver.responses.models import CreateResponse, ItemMessage
previous_response_id = "resp_previous"
response_id = "resp_current"
root = tmp_path / "root"
root.mkdir()
checkpoint_storage = self._helper()(str(root), previous_response_id)
checkpoint = self._checkpoint_with_azure_message_role()
await checkpoint_storage.save(checkpoint)
agent = MagicMock(spec=WorkflowAgent)
agent.id = "wf-agent"
agent.name = "wf"
agent.description = ""
agent.context_providers = []
agent.workflow = MagicMock()
agent.workflow.name = "wf"
agent.workflow._runner_context.has_checkpointing = MagicMock(return_value=False)
agent.run = AsyncMock(
side_effect=[
AgentResponse(messages=[]),
AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("ok")])]),
]
)
server = ResponsesHostServer(agent, store=InMemoryResponseProvider())
server._checkpoint_storage_path = str(root) # pyright: ignore[reportPrivateUsage]
request = CreateResponse(model="m", input="hi", previous_response_id=previous_response_id)
context = ResponseContext(
response_id=response_id, previous_response_id=previous_response_id, mode_flags=MagicMock()
)
input_item = ItemMessage({"type": "message", "role": "user", "content": "next turn"})
with patch.object(ResponseContext, "get_input_items", new=AsyncMock(return_value=[input_item])):
async for _ in server._handle_inner_workflow(request, context): # pyright: ignore[reportPrivateUsage]
pass
assert agent.run.call_count == 2
restore_call = agent.run.call_args_list[0]
assert restore_call.kwargs["checkpoint_id"] == checkpoint.checkpoint_id
assert restore_call.kwargs["checkpoint_storage"].storage_path == (root / previous_response_id).resolve()
new_turn_call = agent.run.call_args_list[1]
new_turn_messages = new_turn_call.args[0]
assert len(new_turn_messages) == 1
assert new_turn_messages[0].text == "next turn"
assert new_turn_call.kwargs["checkpoint_storage"].storage_path == (root / response_id).resolve()
@pytest.mark.parametrize(
"bad_id",
[
# Original MSRC repro: traversal embedded inside an id-shaped value.
# The 14 ``A``s pad the suffix to mimic the exact length of the
# ``api-made-dir<14-char-suffix>`` segment from the original report.
"caresp_x/../../service-data/api-made-dir" + "A" * 14,
# Variant report repros.
"../../escape",
"..",
".",
"...",
"/tmp/escape",
"/absolute/path",
"C:\\temp\\escape",
"..\\..\\escape",
"foo\\..\\bar",
"foo/bar",
"with\x00null",
"",
],
)
def test_traversal_and_separator_payloads_are_rejected(self, tmp_path: Any, bad_id: str) -> None:
helper = self._helper()
# Use a dedicated root *inside* tmp_path so we can assert that nothing
# was created anywhere under tmp_path (root, siblings, or above).
# Asserting against tmp_path.parent would be flaky under parallel test
# execution because tmp_path.parent is shared across tests.
root = tmp_path / "root"
root.mkdir()
before = sorted(p.name for p in tmp_path.iterdir())
with pytest.raises(RuntimeError):
helper(str(root), bad_id)
# No sibling/escape directory should have been created next to the root.
after = sorted(p.name for p in tmp_path.iterdir())
assert before == after, f"Unexpected filesystem artifacts created for payload {bad_id!r}"
# And nothing inside the root either.
assert list(root.iterdir()) == []
def test_non_string_context_id_is_rejected(self, tmp_path: Any) -> None:
helper = self._helper()
with pytest.raises(RuntimeError):
helper(str(tmp_path), None) # type: ignore[arg-type]
def test_url_encoded_traversal_is_treated_as_literal_segment(self, tmp_path: Any) -> None:
"""URL-encoded traversal should not decode to traversal at the filesystem layer.
The hosting layer never URL-decodes ids before using them; the helper
should accept ``%2e%2e`` as a single literal segment (no escape).
"""
helper = self._helper()
root = tmp_path / "root"
root.mkdir()
storage = helper(str(root), "%2e%2e")
assert storage.storage_path.parent == root.resolve()
assert storage.storage_path.name == "%2e%2e"
@pytest.mark.parametrize(
"context_field,bad_id",
[
# Restore sink: caller-controlled previous_response_id.
("previous_response_id", "../../escape"),
("previous_response_id", "/tmp/escape-abs"),
("previous_response_id", "caresp_x/../../service-data/api-made-dir" + "A" * 14),
# Restore sink: server-issued conversation_id (defense in depth).
("conversation_id", "../../escape"),
# Write sink: malicious response_id (defense in depth).
("response_id", "../../escape"),
],
)
async def test_handle_inner_workflow_rejects_malicious_context_id(
self, tmp_path: Any, context_field: str, bad_id: str
) -> None:
"""End-to-end: ``_handle_inner_workflow`` must reject malicious ids on
both the restore sink (``previous_response_id`` / ``conversation_id``)
and the write sink (``response_id``) without creating any directories.
"""
from unittest.mock import patch
from agent_framework import WorkflowAgent
from azure.ai.agentserver.responses import ResponseContext
from azure.ai.agentserver.responses.models import CreateResponse
# Build a mock that satisfies isinstance(agent, WorkflowAgent) and the
# constructor's "no existing checkpointing" guard.
agent = MagicMock(spec=WorkflowAgent)
agent.id = "wf-agent"
agent.name = "wf"
agent.description = ""
agent.context_providers = []
agent.workflow = MagicMock()
agent.workflow.name = "wf"
agent.workflow._runner_context.has_checkpointing = MagicMock(return_value=False)
# Constructor inspects WorkflowAgent.workflow internals; bypass setup
# by feeding a configured mock through a normal init.
server = ResponsesHostServer(agent, store=InMemoryResponseProvider())
# Re-root checkpoint storage at our isolated tmp_path so we can detect
# any escape attempt on the filesystem.
root = tmp_path / "root"
root.mkdir()
server._checkpoint_storage_path = str(root) # pyright: ignore[reportPrivateUsage]
# Build a ResponseContext with the malicious id targeting the chosen sink.
kwargs: dict[str, Any] = {
"response_id": "resp_" + "a" * 48,
"mode_flags": MagicMock(),
}
if context_field == "previous_response_id":
request = CreateResponse(model="m", input="hi", previous_response_id=bad_id)
kwargs["previous_response_id"] = bad_id
elif context_field == "conversation_id":
request = CreateResponse(model="m", input="hi")
kwargs["conversation_id"] = bad_id
else: # response_id (write sink)
request = CreateResponse(model="m", input="hi")
kwargs["response_id"] = bad_id
# Avoid invoking the real input-resolution machinery, which would need
# a configured provider; we never reach the workflow run on rejection.
with patch.object(ResponseContext, "get_input_items", new=AsyncMock(return_value=[])):
context = ResponseContext(**kwargs)
before = sorted(p.name for p in tmp_path.iterdir())
with pytest.raises(RuntimeError, match="Invalid checkpoint context id"):
async for _ in server._handle_inner_workflow(request, context): # pyright: ignore[reportPrivateUsage]
pass
after = sorted(p.name for p in tmp_path.iterdir())
assert before == after, f"Unexpected filesystem artifacts created for {context_field}={bad_id!r}"
assert list(root.iterdir()) == [], f"Checkpoint dir created inside root for {context_field}={bad_id!r}"
@pytest.mark.parametrize(
"context_field,bad_id",
[
# Restore sink: caller-controlled previous_response_id. These are
# rejected by request validation (HTTP 400) before the checkpoint
# code is reached.
("previous_response_id", "../../escape"),
("previous_response_id", "/tmp/escape-abs"),
("previous_response_id", "caresp_x/../../service-data/api-made-dir" + "A" * 14),
# Restore sink: server-issued conversation id (defense in depth).
# Reaches the checkpoint code and is rejected there, surfacing as
# an HTTP 5xx without creating any filesystem artifacts.
("conversation", "../../escape"),
("conversation", "/tmp/escape-abs"),
],
)
async def test_malicious_context_id_rejected_e2e(self, tmp_path: Any, context_field: str, bad_id: str) -> None:
"""End-to-end (ASGI-in-process): malicious context ids must be rejected
through the full HTTP pipeline, and no checkpoint directory may be
created on disk for either the validation-layer rejection
(``previous_response_id``) or the deeper checkpoint-layer rejection
(``conversation``).
The ``response_id`` write-sink is server-generated and not reachable
via the public HTTP surface, so its defense-in-depth check is covered
by the helper-level test above.
"""
from agent_framework import WorkflowAgent
# Build a mock that satisfies isinstance(agent, WorkflowAgent) and the
# constructor's "no existing checkpointing" guard.
agent = MagicMock(spec=WorkflowAgent)
agent.id = "wf-agent"
agent.name = "wf"
agent.description = ""
agent.context_providers = []
agent.workflow = MagicMock()
agent.workflow.name = "wf"
agent.workflow._runner_context.has_checkpointing = MagicMock( # pyright: ignore[reportPrivateUsage]
return_value=False
)
server = ResponsesHostServer(agent, store=InMemoryResponseProvider())
# Re-root checkpoint storage at our isolated tmp_path so we can detect
# any escape attempt on the filesystem.
root = tmp_path / "root"
root.mkdir()
server._checkpoint_storage_path = str(root) # pyright: ignore[reportPrivateUsage]
payload: dict[str, Any] = {"model": "m", "input": "hi"}
if context_field == "previous_response_id":
payload["previous_response_id"] = bad_id
else: # conversation
payload["conversation"] = bad_id
before = sorted(p.name for p in tmp_path.iterdir())
transport = httpx.ASGITransport(app=server)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.post("/responses", json=payload)
after = sorted(p.name for p in tmp_path.iterdir())
# The request must not succeed; either request validation rejects it
# (4xx) or the checkpoint layer raises and the server returns 5xx.
# Either way, no successful response may be produced.
assert resp.status_code >= 400, (
f"Expected non-2xx for {context_field}={bad_id!r}, got {resp.status_code}: {resp.text[:200]}"
)
assert before == after, (
f"Unexpected filesystem artifacts under tmp_path for {context_field}={bad_id!r}: "
f"before={before} after={after}"
)
assert list(root.iterdir()) == [], f"Checkpoint directory created inside root for {context_field}={bad_id!r}"
# region Agent lifecycle (lazy entry & OAuth consent surfacing)
def _make_consent_error(
url: str = "https://consent.example.com/auth",
name: str = "Foundry Toolbox",
) -> Exception:
"""Build an exception wrapping a Foundry MCP gateway consent error.
Mirrors the real-world wrapping produced by ``MCPStreamableHTTPTool.__aenter__``,
which catches connection-time ``McpError``s and re-raises them as a
``ToolExecutionException`` (an ``AgentFrameworkException`` subclass) with the
original error attached via ``inner_exception``. ``consent_url_from_error``
then finds the wrapped ``McpError`` in ``exc.args``.
The McpError message uses the structured Foundry MCP gateway format:
a human-readable prefix followed by a JSON document describing each
failed tool source and its consent URL.
"""
from agent_framework.exceptions import ToolExecutionException
payload = json.dumps({
"errors": [
{
"name": name,
"type": "mcp",
"error": {
"code": "CONSENT_REQUIRED",
"message": url,
},
}
]
})
message = f"tools/list failed for 1 tool source(s), succeeded for 0 tool source(s) {payload}"
inner = McpError(ErrorData(code=CONSENT_ERROR_CODE, message=message))
return ToolExecutionException("MCP consent required", inner_exception=inner)
class TestConsentUrlFromError:
def test_returns_consent_url_when_inner_arg_is_consent_mcp_error(self) -> None:
exc = _make_consent_error("https://example.com/consent", name="my-tool")
assert consent_url_from_error(exc) == [ConsentError(name="my-tool", consent_url="https://example.com/consent")]
def test_returns_none_when_no_mcp_error_in_args(self) -> None:
assert consent_url_from_error(Exception("boom")) is None
def test_returns_none_when_mcp_error_has_different_code(self) -> None:
inner = McpError(ErrorData(code=-32000, message="some other error"))
exc = Exception("wrapped", inner)
assert consent_url_from_error(exc) is None
def test_returns_none_for_bare_mcp_error_without_wrapping(self) -> None:
# `args` of a bare McpError holds the message string, not an McpError
# instance, so it does not match the wrapping pattern produced by the
# MCP client when it bubbles consent errors up.
bare = McpError(ErrorData(code=CONSENT_ERROR_CODE, message="https://x"))
assert consent_url_from_error(bare) is None
def test_returns_none_when_message_has_no_json(self) -> None:
from agent_framework.exceptions import ToolExecutionException
inner = McpError(ErrorData(code=CONSENT_ERROR_CODE, message="no json here"))
exc = ToolExecutionException("MCP consent required", inner_exception=inner)
assert consent_url_from_error(exc) is None
class TestAgentLifecycle:
async def test_agent_entered_lazily_on_first_request(self) -> None:
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("hi")])])
)
server = _make_server(agent)
# Construction must not enter the agent.
assert agent.__aenter__.await_count == 0
await _post(server, input_text="hello", stream=False)
assert agent.__aenter__.await_count == 1
async def test_agent_entered_only_once_across_requests(self) -> None:
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("hi")])])
)
server = _make_server(agent)
await _post(server, input_text="first", stream=False)
await _post(server, input_text="second", stream=False)
await _post(server, input_text="third", stream=False)
assert agent.__aenter__.await_count == 1
async def test_cleanup_exits_agent_and_allows_reentry(self) -> None:
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("hi")])])
)
server = _make_server(agent)
await _post(server, input_text="hello", stream=False)
assert agent.__aenter__.await_count == 1
assert agent.__aexit__.await_count == 0
await server._cleanup_agent() # pyright: ignore[reportPrivateUsage]
assert agent.__aexit__.await_count == 1
# Cleanup is idempotent.
await server._cleanup_agent() # pyright: ignore[reportPrivateUsage]
assert agent.__aexit__.await_count == 1
# After cleanup, a follow-up request re-enters the agent.
await _post(server, input_text="again", stream=False)
assert agent.__aenter__.await_count == 2
async def test_failed_entry_does_not_cache_stack(self) -> None:
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("hi")])])
)
agent.__aenter__.side_effect = [_make_consent_error(), None]
server = _make_server(agent)
await _post(server, input_text="first", stream=False)
# Failed entry must leave the stack empty so the next request retries.
await _post(server, input_text="second", stream=False)
assert agent.__aenter__.await_count == 2
class TestOAuthConsentSurfacing:
async def test_non_streaming_consent_error_emits_oauth_output_item(self) -> None:
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("hi")])])
)
agent.__aenter__.side_effect = _make_consent_error("https://consent.example.com/auth")
server = _make_server(agent)
resp = await _post(server, input_text="hello", stream=False)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
oauth_items = [it for it in body["output"] if it["type"] == "oauth_consent_request"]
assert len(oauth_items) == 1
assert oauth_items[0]["consent_link"] == "https://consent.example.com/auth"
assert oauth_items[0]["server_label"] == "Foundry Toolbox"
# The agent must not be run when entry fails.
agent.run.assert_not_called()
async def test_streaming_consent_error_emits_oauth_output_item(self) -> None:
agent = _make_agent(stream_updates=[AgentResponseUpdate(contents=[Content.from_text("hi")], role="assistant")])
agent.__aenter__.side_effect = _make_consent_error("https://consent.example.com/auth")
server = _make_server(agent)
resp = await _post(server, input_text="hello", stream=True)
assert resp.status_code == 200
events = _parse_sse_events(resp.text)
types = _sse_event_types(events)
assert types[0] == "response.created"
assert types[1] == "response.in_progress"
assert types[-1] == "response.completed"
added = [e for e in events if e["event"] == "response.output_item.added"]
oauth_added = [e for e in added if e["data"]["item"]["type"] == "oauth_consent_request"]
assert len(oauth_added) == 1
assert oauth_added[0]["data"]["item"]["consent_link"] == "https://consent.example.com/auth"
assert oauth_added[0]["data"]["item"]["server_label"] == "Foundry Toolbox"
done = [e for e in events if e["event"] == "response.output_item.done"]
assert any(e["data"]["item"]["type"] == "oauth_consent_request" for e in done)
agent.run.assert_not_called()
async def test_non_consent_error_during_entry_propagates(self) -> None:
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("hi")])])
)
agent.__aenter__.side_effect = RuntimeError("boom")
server = _make_server(agent)
resp = await _post(server, input_text="hello", stream=False)
# Non-consent errors are not swallowed: the response is marked failed
# and no `oauth_consent_request` item is emitted.
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "failed"
assert not any(it["type"] == "oauth_consent_request" for it in body.get("output", []))
agent.run.assert_not_called()
async def test_retry_after_consent_succeeds(self) -> None:
agent = _make_agent(
response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("hello!")])])
)
agent.__aenter__.side_effect = [_make_consent_error("https://consent.example.com/auth"), None]
server = _make_server(agent)
# First request surfaces consent; agent.run is not called.
resp1 = await _post(server, input_text="first", stream=False)
assert resp1.status_code == 200
body1 = resp1.json()
oauth = [it for it in body1["output"] if it["type"] == "oauth_consent_request"]
assert len(oauth) == 1
agent.run.assert_not_called()
# After the user authenticates, the next request enters successfully.
resp2 = await _post(server, input_text="second", stream=False)
assert resp2.status_code == 200
body2 = resp2.json()
assert body2["status"] == "completed"
assert any(it["type"] == "message" for it in body2["output"])
assert agent.__aenter__.await_count == 2
agent.run.assert_awaited_once()
# endregion
# region Workflow agent hosting (end-to-end)
class _ToolApprovalWorkflowAgentMock(SupportsAgentRun):
"""Inner agent for a hosted ``WorkflowAgent`` whose first run emits a
``FunctionApprovalRequestContent`` and whose follow-up run (after
receiving a ``FunctionApprovalResponseContent`` in its inputs) returns a
final assistant text response.
Mirrors a real agent whose tool invocation requires user approval. Used
here to exercise the full HTTP pipeline through ``ResponsesHostServer``
when the hosted agent is a ``WorkflowAgent`` containing a tool-approval
flow.
"""
def __init__(
self,
name: str,
*,
tool_name: str = "delete_file",
tool_arguments: dict[str, Any] | None = None,
approval_request_ids: Sequence[str] | None = None,
final_text: str = "done",
) -> None:
self.id = str(uuid.uuid4())
self.name = name
self.description: str | None = None
self._tool_name = tool_name
self._tool_arguments = tool_arguments or {"path": "/tmp/example"}
self._approval_request_ids: list[str] = list(approval_request_ids) if approval_request_ids else []
self._final_text = final_text
self.run_count = 0
self.last_run_messages: list[Message] = []
def create_session(self, **kwargs: Any) -> AgentSession:
return AgentSession()
def get_session(self, *, service_session_id: str, **kwargs: Any) -> AgentSession:
return AgentSession()
def _next_request_id(self) -> str:
# Stable across calls: when the workflow checkpoint round-trips through
# restore, ``AgentExecutor`` re-invokes the inner agent during replay.
# We must surface the *same* approval request id on each invocation so
# the workflow's pending-request id matches the id the test echoes
# back as ``mcp_approval_response``.
if self._approval_request_ids:
return self._approval_request_ids[0]
return str(uuid.uuid4())
def _build_approval_request(self) -> Content:
request_id = self._next_request_id()
function_call = Content.from_function_call(
call_id=request_id,
name=self._tool_name,
arguments=self._tool_arguments,
additional_properties={"server_label": "test_server"},
)
return Content.from_function_approval_request(id=request_id, function_call=function_call)
@overload
def run(
self,
messages: str | Content | Message | Sequence[str | Content | Message] | None = ...,
*,
stream: Literal[False] = ...,
session: AgentSession | None = ...,
**kwargs: Any,
) -> Awaitable[AgentResponse[Any]]: ...
@overload
def run(
self,
messages: str | Content | Message | Sequence[str | Content | Message] | None = ...,
*,
stream: Literal[True],
session: AgentSession | None = ...,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse[Any]]: ...
def run(
self,
messages: str | Content | Message | Sequence[str | Content | Message] | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
if stream:
return self._run_stream(messages=messages, **kwargs)
return self._run(messages=messages, **kwargs)
@staticmethod
def _normalize(
messages: str | Content | Message | Sequence[str | Content | Message] | None,
) -> list[Message]:
if messages is None:
return []
if isinstance(messages, str):
return [Message(role="user", contents=[Content.from_text(text=messages)])]
if isinstance(messages, Message):
return [messages]
if isinstance(messages, Content):
return [Message(role="user", contents=[messages])]
result: list[Message] = []
for item in messages:
if isinstance(item, Message):
result.append(item)
elif isinstance(item, Content):
result.append(Message(role="user", contents=[item]))
else:
result.append(Message(role="user", contents=[Content.from_text(text=item)]))
return result
@staticmethod
def _approval_responses_in(messages: list[Message]) -> list[Content]:
return [c for m in messages for c in m.contents if c.type == "function_approval_response"]
async def _run(
self,
messages: str | Content | Message | Sequence[str | Content | Message] | None = None,
**kwargs: Any,
) -> AgentResponse:
normalized = self._normalize(messages)
self.last_run_messages = normalized
self.run_count += 1
if self._approval_responses_in(normalized):
return AgentResponse(messages=[Message("assistant", [Content.from_text(text=self._final_text)])])
approval = self._build_approval_request()
return AgentResponse(messages=[Message("assistant", [approval])])
def _run_stream(
self,
messages: str | Content | Message | Sequence[str | Content | Message] | None = None,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse]:
normalized = self._normalize(messages)
self.last_run_messages = normalized
self.run_count += 1
approvals = self._approval_responses_in(normalized)
async def _iter() -> AsyncIterator[AgentResponseUpdate]:
if approvals:
yield AgentResponseUpdate(
contents=[Content.from_text(text=self._final_text)],
role="assistant",
author_name=self.name,
)
return
yield AgentResponseUpdate(
contents=[self._build_approval_request()],
role="assistant",
author_name=self.name,
)
return ResponseStream(_iter(), finalizer=AgentResponse.from_updates)
def _build_text_workflow_agent(text: str) -> WorkflowAgent:
"""Build a minimal ``WorkflowAgent`` whose inner agent emits a fixed text."""
class _TextAgent(SupportsAgentRun):
def __init__(self, name: str, text: str) -> None:
self.id = str(uuid.uuid4())
self.name = name
self.description: str | None = None
self._text = text
def create_session(self, **kwargs: Any) -> AgentSession:
return AgentSession()
def get_session(self, *, service_session_id: str, **kwargs: Any) -> AgentSession:
return AgentSession()
@overload
def run(
self,
messages: Any = ...,
*,
stream: Literal[False] = ...,
session: AgentSession | None = ...,
**kwargs: Any,
) -> Awaitable[AgentResponse[Any]]: ...
@overload
def run(
self,
messages: Any = ...,
*,
stream: Literal[True],
session: AgentSession | None = ...,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse[Any]]: ...
def run(
self,
messages: Any = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
text = self._text
name = self.name
async def _aresult() -> AgentResponse:
return AgentResponse(messages=[Message("assistant", [Content.from_text(text=text)])])
async def _aiter() -> AsyncIterator[AgentResponseUpdate]:
yield AgentResponseUpdate(
contents=[Content.from_text(text=text)],
role="assistant",
author_name=name,
)
if stream:
return ResponseStream(_aiter(), finalizer=AgentResponse.from_updates)
return _aresult()
inner = _TextAgent("text-agent", text)
@executor
async def start(messages: list[Message], ctx: WorkflowContext[AgentExecutorRequest]) -> None:
await ctx.send_message(AgentExecutorRequest(messages=messages, should_respond=True))
workflow = WorkflowBuilder(start_executor=start).add_edge(start, inner).build()
return WorkflowAgent(workflow=workflow, name="Text Workflow Agent")
def _build_approval_workflow_agent(
*,
approval_request_id: str,
tool_name: str = "delete_file",
tool_arguments: dict[str, Any] | None = None,
final_text: str = "done",
) -> tuple[WorkflowAgent, _ToolApprovalWorkflowAgentMock]:
"""Build a ``WorkflowAgent`` whose inner agent emits a tool approval request."""
mock_agent = _ToolApprovalWorkflowAgentMock(
name="approval-agent",
tool_name=tool_name,
tool_arguments=tool_arguments or {"path": "/tmp/secret.txt"},
approval_request_ids=[approval_request_id],
final_text=final_text,
)
@executor
async def start(messages: list[Message], ctx: WorkflowContext[AgentExecutorRequest]) -> None:
await ctx.send_message(AgentExecutorRequest(messages=messages, should_respond=True))
workflow = WorkflowBuilder(start_executor=start).add_edge(start, mock_agent).build()
workflow_agent = WorkflowAgent(workflow=workflow, name="Approval Workflow Agent")
return workflow_agent, mock_agent
class TestWorkflowAgentHosting:
"""End-to-end HTTP tests for ``ResponsesHostServer`` hosting a ``WorkflowAgent``.
These tests drive ``_handle_inner_workflow`` through the ASGI stack:
they exercise checkpoint write/restore (multi-turn) and the
tool-approval round-trip path, which is the primary differentiator
relative to the regular agent path.
"""
async def test_basic_text_response(self) -> None:
workflow_agent = _build_text_workflow_agent("hello from workflow")
server = _make_server(workflow_agent)
resp = await _post(server, input_text="hi", stream=False)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
text_found = any(
part.get("type") == "output_text" and part.get("text") == "hello from workflow"
for item in body["output"]
if item["type"] == "message"
for part in item.get("content", [])
)
assert text_found, f"Expected workflow output text in {body['output']}"
async def test_basic_text_response_streaming(self) -> None:
workflow_agent = _build_text_workflow_agent("hello stream")
server = _make_server(workflow_agent)
resp = await _post(server, input_text="hi", stream=True)
assert resp.status_code == 200
events = _parse_sse_events(resp.text)
types = _sse_event_types(events)
assert types[0] == "response.created"
assert types[-1] == "response.completed"
assert "response.output_text.delta" in types
text_done = [e for e in events if e["event"] == "response.output_text.done"]
assert any(e["data"]["text"] == "hello stream" for e in text_done)
async def test_non_streaming_emits_mcp_approval_request_and_persists_to_storage(self) -> None:
workflow_agent, mock_agent = _build_approval_workflow_agent(approval_request_id="apr_wf_ns")
server = _make_server(workflow_agent)
resp = await _post(server, stream=False)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
approval_items = [it for it in body["output"] if it["type"] == "mcp_approval_request"]
assert len(approval_items) == 1
assert approval_items[0]["name"] == "delete_file"
assert approval_items[0]["server_label"] == "test_server"
approval_request_id = approval_items[0]["id"]
# The id surfaced over the wire is generated by the response stream
# builder; the original approval ``Content`` (carrying the inner
# ``function_call``) must be persisted under that id so the next
# turn can reconstruct it.
loaded = await server._approval_storage.load_approval_request( # pyright: ignore[reportPrivateUsage]
approval_request_id
)
assert loaded.type == "function_approval_request"
assert loaded.function_call.name == "delete_file" # type: ignore[attr-defined]
assert mock_agent.run_count == 1
async def test_streaming_emits_mcp_approval_request_and_persists_to_storage(self) -> None:
workflow_agent, mock_agent = _build_approval_workflow_agent(approval_request_id="apr_wf_st")
server = _make_server(workflow_agent)
resp = await _post(server, stream=True)
assert resp.status_code == 200
events = _parse_sse_events(resp.text)
types = _sse_event_types(events)
assert types[0] == "response.created"
assert types[-1] == "response.completed"
approval_request_id: str | None = None
for e in events:
if e["event"] != "response.output_item.added":
continue
item = e["data"].get("item") or {}
if item.get("type") == "mcp_approval_request":
approval_request_id = item.get("id")
break
assert approval_request_id is not None
loaded = await server._approval_storage.load_approval_request( # pyright: ignore[reportPrivateUsage]
approval_request_id
)
assert loaded.type == "function_approval_request"
assert mock_agent.run_count == 1
async def test_round_trip_approval_response_resumes_workflow_agent(self) -> None:
"""Two-turn HTTP round-trip:
Turn 1 emits ``mcp_approval_request`` and writes a workflow
checkpoint under the response id. Turn 2 sends the
``mcp_approval_response`` with ``previous_response_id`` set, so the
host restores the checkpoint, the WorkflowAgent routes the
approval response back to the paused inner agent, and the inner
agent emits the final assistant text.
"""
workflow_agent, mock_agent = _build_approval_workflow_agent(
approval_request_id="apr_wf_rt",
final_text="done with approval",
)
server = _make_server(workflow_agent)
first = await _post(server, stream=False)
assert first.status_code == 200
first_body = first.json()
first_response_id = first_body["id"]
approval_items = [it for it in first_body["output"] if it["type"] == "mcp_approval_request"]
assert len(approval_items) == 1
approval_request_id = approval_items[0]["id"]
assert mock_agent.run_count == 1
second_payload: dict[str, Any] = {
"model": "test-model",
"input": [
{
"type": "mcp_approval_response",
"approval_request_id": approval_request_id,
"approve": True,
}
],
"stream": False,
"previous_response_id": first_response_id,
}
second = await _post_json(server, second_payload)
assert second.status_code == 200
second_body = second.json()
assert second_body["status"] == "completed"
# The inner agent must have been resumed (restore replay + new turn).
# Restore call is a no-op for the mock (no input); the new-turn call
# delivers the approval response, so run_count grows by at least 1.
assert mock_agent.run_count >= 2
# The final assistant text from the resumed inner agent surfaces in
# the HTTP output.
text_pieces = [
part.get("text", "")
for item in second_body["output"]
if item["type"] == "message"
for part in item.get("content", [])
if part.get("type") == "output_text"
]
assert any("done with approval" in t for t in text_pieces), (
f"expected resumed workflow output, got {second_body['output']}"
)
# The new-turn invocation of the inner agent must have received the
# approval response routed back through WorkflowAgent.
approval_responses = [
c for m in mock_agent.last_run_messages for c in m.contents if c.type == "function_approval_response"
]
assert len(approval_responses) == 1
assert approval_responses[0].approved is True # type: ignore[attr-defined]
async def test_round_trip_approval_response_streaming(self) -> None:
"""Streaming variant of the round-trip: turn 2 is requested with
``stream=true`` and surfaces the resumed text as SSE events."""
workflow_agent, mock_agent = _build_approval_workflow_agent(
approval_request_id="apr_wf_rt_st",
final_text="streamed-done",
)
server = _make_server(workflow_agent)
first = await _post(server, stream=False)
first_body = first.json()
first_response_id = first_body["id"]
approval_request_id = next(it["id"] for it in first_body["output"] if it["type"] == "mcp_approval_request")
second = await _post_json(
server,
{
"model": "test-model",
"input": [
{
"type": "mcp_approval_response",
"approval_request_id": approval_request_id,
"approve": True,
}
],
"stream": True,
"previous_response_id": first_response_id,
},
)
assert second.status_code == 200
events = _parse_sse_events(second.text)
types = _sse_event_types(events)
assert types[0] == "response.created"
assert types[-1] == "response.completed"
text_done = [e for e in events if e["event"] == "response.output_text.done"]
assert any("streamed-done" in e["data"]["text"] for e in text_done)
assert mock_agent.run_count >= 2
async def test_round_trip_approval_response_rejected(self) -> None:
"""Sending ``approve=False`` must surface as ``approved=False`` to the
inner agent on resume."""
workflow_agent, mock_agent = _build_approval_workflow_agent(
approval_request_id="apr_wf_reject",
final_text="acknowledged",
)
server = _make_server(workflow_agent)
first = await _post(server, stream=False)
first_body = first.json()
first_response_id = first_body["id"]
approval_request_id = next(it["id"] for it in first_body["output"] if it["type"] == "mcp_approval_request")
second = await _post_json(
server,
{
"model": "test-model",
"input": [
{
"type": "mcp_approval_response",
"approval_request_id": approval_request_id,
"approve": False,
}
],
"stream": False,
"previous_response_id": first_response_id,
},
)
assert second.status_code == 200
approval_responses = [
c for m in mock_agent.last_run_messages for c in m.contents if c.type == "function_approval_response"
]
assert len(approval_responses) == 1
assert approval_responses[0].approved is False # type: ignore[attr-defined]
# endregion