Merge branch 'main' into local-branch-fix-workflow-as-agent-pending-request-handling

This commit is contained in:
Tao Chen
2026-06-02 15:09:28 -07:00
Unverified
28 changed files with 5306 additions and 4407 deletions
+8 -1
View File
@@ -27,7 +27,7 @@ Status is grouped into these buckets:
| `agent-framework-claude` | `python/packages/claude` | `beta` |
| `agent-framework-copilotstudio` | `python/packages/copilotstudio` | `beta` |
| `agent-framework-core` | `python/packages/core` | `released` |
| `agent-framework-declarative` | `python/packages/declarative` | `beta` |
| `agent-framework-declarative` | `python/packages/declarative` | `rc` |
| `agent-framework-devui` | `python/packages/devui` | `beta` |
| `agent-framework-durabletask` | `python/packages/durabletask` | `beta` |
| `agent-framework-foundry` | `python/packages/foundry` | `released` |
@@ -58,6 +58,13 @@ listed below.
### Experimental features
#### `DECLARATIVE_AGENTS`
- `agent-framework-declarative`: declarative agent loading APIs from
`agent_framework_declarative`, including `AgentFactory`,
`DeclarativeLoaderError`, `ProviderLookupError`, and `ProviderTypeMapping`
from `agent_framework_declarative/_loader.py`
#### `EVALS`
- `agent-framework-core`: exported evaluation APIs from `agent_framework`, including
@@ -50,6 +50,7 @@ class ExperimentalFeature(str, Enum):
on enum membership or attribute presence over time.
"""
DECLARATIVE_AGENTS = "DECLARATIVE_AGENTS"
EVALS = "EVALS"
FILE_HISTORY = "FILE_HISTORY"
FIDES = "FIDES"
@@ -498,14 +498,34 @@ def _get_exporters_from_env(
# Get base endpoint
base_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
# Get signal-specific endpoints (these override base endpoint)
traces_endpoint = os.getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") or base_endpoint
metrics_endpoint = os.getenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT") or base_endpoint
logs_endpoint = os.getenv("OTEL_EXPORTER_OTLP_LOGS_ENDPOINT") or base_endpoint
# Get signal-specific endpoints (these override base endpoint and are used verbatim)
traces_endpoint_specific = os.getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT")
metrics_endpoint_specific = os.getenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT")
logs_endpoint_specific = os.getenv("OTEL_EXPORTER_OTLP_LOGS_ENDPOINT")
# Get protocol (default is grpc)
protocol = os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL", "grpc").lower()
# Per the OTel spec, OTEL_EXPORTER_OTLP_ENDPOINT is a *base* URL for HTTP — the SDK
# auto-appends /v1/{traces,metrics,logs} when it reads the env var directly. The
# signal-specific endpoint env vars are *full* URLs used verbatim. Because we read
# the env vars here and forward them as the ``endpoint=`` constructor argument
# (which the SDK always treats as a full URL), we must replicate the auto-append
# ourselves for HTTP when falling back to the base endpoint. For gRPC, the base
# endpoint is used as-is.
traces_endpoint: str | None
metrics_endpoint: str | None
logs_endpoint: str | None
if protocol in ("http/protobuf", "http") and base_endpoint:
base_for_http = base_endpoint.rstrip("/")
traces_endpoint = traces_endpoint_specific or f"{base_for_http}/v1/traces"
metrics_endpoint = metrics_endpoint_specific or f"{base_for_http}/v1/metrics"
logs_endpoint = logs_endpoint_specific or f"{base_for_http}/v1/logs"
else:
traces_endpoint = traces_endpoint_specific or base_endpoint
metrics_endpoint = metrics_endpoint_specific or base_endpoint
logs_endpoint = logs_endpoint_specific or base_endpoint
# Get base headers
base_headers_str = os.getenv("OTEL_EXPORTER_OTLP_HEADERS", "")
base_headers = _parse_headers(base_headers_str)
@@ -761,6 +761,115 @@ def test_get_exporters_from_env_missing_grpc_dependency(monkeypatch):
_get_exporters_from_env()
# region Test OTLP endpoint computation (base-URL auto-append for HTTP)
def test_get_exporters_from_env_http_base_endpoint_appends_signal_paths(monkeypatch):
"""OTEL_EXPORTER_OTLP_ENDPOINT is a base URL for HTTP; SDK auto-appends
/v1/{traces,metrics,logs}. Because we read the env var and forward it as the
constructor ``endpoint=`` arg (which the SDK treats as a full URL), we must
replicate the auto-append ourselves.
"""
from unittest.mock import patch
from agent_framework import observability
monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318")
monkeypatch.setenv("OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf")
for key in (
"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT",
"OTEL_EXPORTER_OTLP_METRICS_ENDPOINT",
"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT",
):
monkeypatch.delenv(key, raising=False)
with patch.object(observability, "_create_otlp_exporters", return_value=[]) as create:
observability._get_exporters_from_env()
kwargs = create.call_args.kwargs
assert kwargs["protocol"] == "http/protobuf"
assert kwargs["traces_endpoint"] == "http://localhost:4318/v1/traces"
assert kwargs["metrics_endpoint"] == "http://localhost:4318/v1/metrics"
assert kwargs["logs_endpoint"] == "http://localhost:4318/v1/logs"
def test_get_exporters_from_env_http_base_endpoint_trailing_slash(monkeypatch):
"""A trailing slash on the base endpoint should not produce a doubled slash."""
from unittest.mock import patch
from agent_framework import observability
monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318/")
monkeypatch.setenv("OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf")
for key in (
"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT",
"OTEL_EXPORTER_OTLP_METRICS_ENDPOINT",
"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT",
):
monkeypatch.delenv(key, raising=False)
with patch.object(observability, "_create_otlp_exporters", return_value=[]) as create:
observability._get_exporters_from_env()
kwargs = create.call_args.kwargs
assert kwargs["traces_endpoint"] == "http://localhost:4318/v1/traces"
assert kwargs["metrics_endpoint"] == "http://localhost:4318/v1/metrics"
assert kwargs["logs_endpoint"] == "http://localhost:4318/v1/logs"
def test_get_exporters_from_env_http_signal_specific_used_verbatim(monkeypatch):
"""Signal-specific endpoint env vars are full URLs and must be used verbatim,
even when a base endpoint is also set.
"""
from unittest.mock import patch
from agent_framework import observability
monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318")
monkeypatch.setenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", "http://traces.example.com/custom/path")
monkeypatch.setenv("OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf")
for key in (
"OTEL_EXPORTER_OTLP_METRICS_ENDPOINT",
"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT",
):
monkeypatch.delenv(key, raising=False)
with patch.object(observability, "_create_otlp_exporters", return_value=[]) as create:
observability._get_exporters_from_env()
kwargs = create.call_args.kwargs
# Signal-specific is verbatim — no path appended
assert kwargs["traces_endpoint"] == "http://traces.example.com/custom/path"
# Others fall back to base, with path appended
assert kwargs["metrics_endpoint"] == "http://localhost:4318/v1/metrics"
assert kwargs["logs_endpoint"] == "http://localhost:4318/v1/logs"
def test_get_exporters_from_env_grpc_base_endpoint_unchanged(monkeypatch):
"""For gRPC, the base endpoint applies to all signals as-is (no path append)."""
from unittest.mock import patch
from agent_framework import observability
monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")
monkeypatch.setenv("OTEL_EXPORTER_OTLP_PROTOCOL", "grpc")
for key in (
"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT",
"OTEL_EXPORTER_OTLP_METRICS_ENDPOINT",
"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT",
):
monkeypatch.delenv(key, raising=False)
with patch.object(observability, "_create_otlp_exporters", return_value=[]) as create:
observability._get_exporters_from_env()
kwargs = create.call_args.kwargs
assert kwargs["protocol"] == "grpc"
assert kwargs["traces_endpoint"] == "http://localhost:4317"
assert kwargs["metrics_endpoint"] == "http://localhost:4317"
assert kwargs["logs_endpoint"] == "http://localhost:4317"
# region Test create_resource
+12
View File
@@ -6,6 +6,18 @@ Please install this package via pip:
pip install agent-framework-declarative --pre
```
## Release stage
This package ships at two different stability levels:
- **Declarative workflows** (`WorkflowFactory`, executors, handlers, and the
`_workflows` surface) are at **release-candidate** stability and may receive only
minor refinements before GA.
- **Declarative agents** (`AgentFactory` and the YAML agent loading/parsing path:
`DeclarativeLoaderError`, `ProviderLookupError`, `ProviderTypeMapping`) are
**experimental** and may change or be removed in future versions without notice.
Using any of these symbols emits an `ExperimentalWarning` on first use.
## Declarative features
The declarative packages provides support for building agents based on a declarative yaml specification.
@@ -1,5 +1,18 @@
# Copyright (c) Microsoft. All rights reserved.
"""Declarative specification support for Microsoft Agent Framework.
Release stage:
* The declarative-workflows surface (``WorkflowFactory``, executors, handlers,
etc.) is at release-candidate stability.
* The declarative-agents surface (``AgentFactory`` and the YAML agent
loading/parsing path: ``DeclarativeLoaderError``, ``ProviderLookupError``,
``ProviderTypeMapping``) is *experimental* and may change or be removed in
future versions without notice. Using these symbols emits an
``ExperimentalWarning`` on first use.
"""
from importlib import metadata
from ._loader import AgentFactory, DeclarativeLoaderError, ProviderLookupError, ProviderTypeMapping
@@ -15,6 +15,10 @@ from agent_framework import (
from agent_framework import (
FunctionTool as AFFunctionTool,
)
from agent_framework._feature_stage import ( # type: ignore[reportPrivateUsage]
ExperimentalFeature,
experimental,
)
from agent_framework.exceptions import AgentException
from dotenv import load_dotenv
@@ -43,6 +47,7 @@ else:
from typing_extensions import TypedDict # type: ignore # pragma: no cover
@experimental(feature_id=ExperimentalFeature.DECLARATIVE_AGENTS)
class ProviderTypeMapping(TypedDict, total=True):
package: str
name: str
@@ -118,18 +123,21 @@ PROVIDER_TYPE_OBJECT_MAPPING: dict[str, ProviderTypeMapping] = {
}
@experimental(feature_id=ExperimentalFeature.DECLARATIVE_AGENTS)
class DeclarativeLoaderError(AgentException):
"""Exception raised for errors in the declarative loader."""
pass
@experimental(feature_id=ExperimentalFeature.DECLARATIVE_AGENTS)
class ProviderLookupError(DeclarativeLoaderError):
"""Exception raised for errors in provider type lookup."""
pass
@experimental(feature_id=ExperimentalFeature.DECLARATIVE_AGENTS)
class AgentFactory:
"""Factory for creating Agent instances from declarative YAML definitions.
+3 -2
View File
@@ -4,7 +4,7 @@ description = "Declarative specification support for Microsoft Agent Framework."
authors = [{ name = "Microsoft", email = "af-support@microsoft.com"}]
readme = "README.md"
requires-python = ">=3.10"
version = "1.0.0b260528"
version = "1.0.0rc1"
license-files = ["LICENSE"]
urls.homepage = "https://aka.ms/agent-framework"
urls.source = "https://github.com/microsoft/agent-framework/tree/main/python"
@@ -49,7 +49,8 @@ addopts = "-ra -q -r fEX"
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
filterwarnings = [
"ignore:Support for class-based `config` is deprecated:DeprecationWarning:pydantic.*"
"ignore:Support for class-based `config` is deprecated:DeprecationWarning:pydantic.*",
"ignore::agent_framework._feature_stage.ExperimentalWarning",
]
timeout = 120
markers = [
@@ -351,6 +351,7 @@ class RawFoundryAgentChatClient( # type: ignore[misc]
if _uses_foundry_agent_session(conversation_id):
run_options.pop("previous_response_id", None)
run_options.pop("conversation", None)
run_options.pop("model", None)
extra_body["agent_session_id"] = conversation_id
# Non-preview Prompt/Hosted Agent calls need agent_reference in the request body to
# tell the Responses API which Foundry agent (and version) is in use, since ``model``
@@ -366,7 +367,6 @@ class RawFoundryAgentChatClient( # type: ignore[misc]
# Strip tools from request body - Foundry API rejects requests with both
# agent endpoint and tools present. FunctionTools are invoked client-side
# by the function invocation layer, not sent to the service.
run_options.pop("model", None)
if not self.allow_preview:
run_options.pop("tools", None)
run_options.pop("tool_choice", None)
@@ -203,7 +203,7 @@ async def test_raw_foundry_agent_chat_client_prepare_options_accepts_function_to
async def test_raw_foundry_agent_chat_client_prepare_options_strips_client_side_fields() -> None:
"""Test that _prepare_options strips model and tool-loop fields from run_options."""
"""Test that _prepare_options strips tool-loop fields but preserves model for non-session requests."""
mock_project = MagicMock()
mock_openai = MagicMock()
@@ -235,16 +235,49 @@ async def test_raw_foundry_agent_chat_client_prepare_options_strips_client_side_
options={"tools": [my_func]},
)
assert "model" not in result
# model is preserved for non-session (PromptAgent) requests
assert result["model"] == "gpt-4.1"
assert "tools" not in result
assert "tool_choice" not in result
assert "parallel_tool_calls" not in result
# agent_reference is required so the Responses API can resolve model server-side; see #5582.
assert result == {
"model": "gpt-4.1",
"extra_body": {"agent_reference": {"name": "test-agent", "type": "agent_reference"}},
}
async def test_raw_foundry_agent_chat_client_prepare_options_strips_model_for_hosted_session() -> None:
"""Test that model is stripped when using a hosted agent session (not a PromptAgent)."""
mock_project = MagicMock()
mock_openai = MagicMock()
mock_project.get_openai_client.return_value = mock_openai
client = RawFoundryAgentChatClient(
project_client=mock_project,
agent_name="test-agent",
)
with patch(
"agent_framework_openai._chat_client.RawOpenAIChatClient._prepare_options",
new_callable=AsyncMock,
return_value={
"model": "gpt-4.1",
"previous_response_id": "resp_abc",
},
):
result = await client._prepare_options(
messages=[Message(role="user", contents="hi")],
options={"conversation_id": "agent-session-123"},
)
assert "model" not in result
assert "previous_response_id" not in result
assert result["extra_body"]["agent_session_id"] == "agent-session-123"
assert result["extra_body"]["agent_reference"] == {"name": "test-agent", "type": "agent_reference"}
async def test_raw_foundry_agent_chat_client_prepare_options_injects_agent_reference_first_turn() -> None:
"""First-turn (no conversation_id) Prompt Agent calls must carry agent_reference in extra_body.
@@ -272,7 +305,6 @@ async def test_raw_foundry_agent_chat_client_prepare_options_injects_agent_refer
options={},
)
assert "model" not in result
assert result["extra_body"] == {
"agent_reference": {"name": "test-agent", "type": "agent_reference", "version": "2"},
}
@@ -333,7 +365,8 @@ async def test_raw_foundry_agent_chat_client_prepare_options_skips_agent_referen
options={},
)
assert "model" not in result
# model is preserved for non-session requests (platform tolerates it for hosted agents)
assert result["model"] == "gpt-4.1"
# No extra_body at all is the cleanest signal — agent_reference must not be injected here.
assert "extra_body" not in result
@@ -363,6 +396,39 @@ async def test_raw_foundry_agent_chat_client_prepare_options_respects_caller_age
assert result["extra_body"]["agent_reference"] == caller_reference
async def test_raw_foundry_agent_chat_client_prepare_options_preserves_model_for_resp_continuation() -> None:
"""Test that model is preserved when conversation_id is a resp_* continuation (HostedAgent v1 / v2-no-session)."""
mock_project = MagicMock()
mock_openai = MagicMock()
mock_project.get_openai_client.return_value = mock_openai
client = RawFoundryAgentChatClient(
project_client=mock_project,
agent_name="test-agent",
)
with patch(
"agent_framework_openai._chat_client.RawOpenAIChatClient._prepare_options",
new_callable=AsyncMock,
return_value={
"model": "gpt-4.1",
"previous_response_id": "resp_abc123",
},
):
result = await client._prepare_options(
messages=[Message(role="user", contents="hi")],
options={"conversation_id": "resp_abc123"},
)
# model preserved — resp_* is standard Responses API continuity, not a hosted session
assert result["model"] == "gpt-4.1"
# previous_response_id preserved — not stripped outside hosted session path
assert result["previous_response_id"] == "resp_abc123"
# no agent_session_id injected
assert "extra_body" not in result or "agent_session_id" not in result.get("extra_body", {})
async def test_raw_foundry_agent_chat_client_prepare_options_maps_agent_session_id_to_extra_body() -> None:
"""Test that service_session_id is forwarded as agent_session_id for hosted sessions."""
@@ -9,7 +9,7 @@ import logging
import os
import tempfile
import threading
from collections.abc import AsyncIterable, AsyncIterator, Generator, Sequence
from collections.abc import AsyncIterable, AsyncIterator, Generator, Mapping, Sequence
from contextlib import AbstractAsyncContextManager, AsyncExitStack, suppress
from dataclasses import asdict, is_dataclass
from pathlib import Path
@@ -472,14 +472,12 @@ class ResponsesHostServer(ResponsesAgentServerHost):
# Run the agent in non-streaming mode
response = await self._agent.run(stream=False, **run_kwargs) # type: ignore[reportUnknownMemberType]
for message in response.messages:
for content in message.contents:
async for item in _to_outputs(
response_event_stream,
content,
approval_storage=self._approval_storage,
):
yield item
async for item in _to_outputs_for_messages(
response_event_stream,
response.messages,
approval_storage=self._approval_storage,
):
yield item
yield response_event_stream.emit_completed()
else:
if tracker is None: # pragma: no cover - defensive, set above
@@ -620,12 +618,8 @@ class ResponsesHostServer(ResponsesAgentServerHost):
checkpoint_storage=write_storage,
)
for message in response.messages:
for content in message.contents:
async for item in _to_outputs(
response_event_stream, content, approval_storage=self._approval_storage
):
yield item
async for item in _to_outputs_for_messages(response_event_stream, response.messages):
yield item
await self._delete_not_latest_checkpoints(write_storage, self._agent.workflow.name)
yield response_event_stream.emit_completed()
@@ -733,7 +727,7 @@ class _OutputItemTracker:
yield self._fc_builder.emit_arguments_delta(args_str)
elif content.type == "mcp_server_tool_call" and content.tool_name:
key = f"{content.server_name or 'default'}::{content.tool_name}"
key = content.call_id or f"{content.server_name or 'default'}::{content.tool_name}"
if self._active_type != "mcp_server_tool_call" or self._active_id != key:
yield from self._close()
yield from self._open_mcp_call(content)
@@ -742,6 +736,24 @@ class _OutputItemTracker:
if self._mcp_builder is not None:
yield self._mcp_builder.emit_arguments_delta(args_str)
elif (
content.type == "mcp_server_tool_result"
and self._active_type == "mcp_server_tool_call"
and self._mcp_builder is not None
and content.call_id is not None
and content.call_id == self._mcp_builder.item_id
):
accumulated = "".join(self._accumulated)
yield self._mcp_builder.emit_arguments_done(accumulated)
yield self._mcp_builder.emit_completed()
yield self._mcp_builder.emit_done(output=_stringify_mcp_output(content.output))
self._mcp_builder = None
self._active_type = None
self._active_id = None
self._accumulated.clear()
self.needs_async = False
return
else:
yield from self._close()
self.needs_async = True
@@ -781,9 +793,10 @@ class _OutputItemTracker:
self._mcp_builder = self._stream.add_output_item_mcp_call(
server_label=content.server_name or "default",
name=content.tool_name or "",
item_id=content.call_id,
)
self._active_type = "mcp_server_tool_call"
self._active_id = f"{content.server_name or 'default'}::{content.tool_name}"
self._active_id = content.call_id or f"{content.server_name or 'default'}::{content.tool_name}"
yield self._mcp_builder.emit_added()
def _close(self) -> Generator[ResponseStreamEvent]:
@@ -931,16 +944,19 @@ async def _item_to_message(item: Item, *, approval_storage: ApprovalStorage | No
if item.type == "mcp_call":
mcp = cast(ItemMcpToolCall, item)
contents = [
Content.from_mcp_server_tool_call(
mcp.id,
mcp.name,
server_name=mcp.server_label,
arguments=mcp.arguments,
)
]
if getattr(mcp, "output", None) is not None:
contents.append(Content.from_mcp_server_tool_result(call_id=mcp.id, output=mcp.output))
return Message(
role="assistant",
contents=[
Content.from_mcp_server_tool_call(
mcp.id,
mcp.name,
server_name=mcp.server_label,
arguments=mcp.arguments,
)
],
contents=contents,
)
if item.type == "mcp_approval_request":
@@ -1201,16 +1217,19 @@ async def _output_item_to_message(item: OutputItem, *, approval_storage: Approva
if item.type == "mcp_call":
mcp = cast(OutputItemMcpToolCall, item)
contents = [
Content.from_mcp_server_tool_call(
mcp.id,
mcp.name,
server_name=mcp.server_label,
arguments=mcp.arguments,
)
]
if getattr(mcp, "output", None) is not None:
contents.append(Content.from_mcp_server_tool_result(call_id=mcp.id, output=mcp.output))
return Message(
role="assistant",
contents=[
Content.from_mcp_server_tool_call(
mcp.id,
mcp.name,
server_name=mcp.server_label,
arguments=mcp.arguments,
)
],
contents=contents,
)
if item.type == "mcp_approval_request":
@@ -1587,6 +1606,7 @@ async def _to_outputs(
mcp_call = stream.add_output_item_mcp_call(
server_label=content.server_name or "default",
name=content.tool_name or "",
item_id=content.call_id,
)
yield mcp_call.emit_added()
async for event in mcp_call.aarguments(_arguments_to_str(content.arguments)):
@@ -1661,4 +1681,91 @@ async def _to_outputs(
logger.warning(f"Content type '{content.type}' is not supported yet. This is usually safe to ignore.")
def _stringify_mcp_output(output: Any) -> str:
"""Convert hosted MCP output payloads into the string shape expected by mcp_call.output."""
if output is None:
return ""
if isinstance(output, str):
return output
if isinstance(output, Mapping):
text = cast(Any, output).get("text")
if isinstance(text, str):
return text
return json.dumps(output, default=str)
if isinstance(output, Sequence) and not isinstance(output, (str, bytes, bytearray)):
parts: list[str] = []
entries = cast(Sequence[object], output)
for entry in entries:
if isinstance(entry, Content) and entry.type == "text":
parts.append(entry.text or "")
continue
parts.append(_stringify_mcp_output(entry))
return "".join(parts)
return str(output)
def _emit_completed_mcp_call(
stream: ResponseEventStream,
call_content: Content,
*,
arguments: str,
output: str,
) -> Generator[ResponseStreamEvent]:
"""Emit a single completed MCP call item carrying both arguments and output."""
mcp_call = stream.add_output_item_mcp_call(
server_label=call_content.server_name or "default",
name=call_content.tool_name or "",
item_id=call_content.call_id,
)
yield mcp_call.emit_added()
yield mcp_call.emit_arguments_done(arguments)
yield mcp_call.emit_completed()
yield mcp_call.emit_done(output=output)
async def _to_outputs_for_messages(
stream: ResponseEventStream,
messages: Sequence[Message],
*,
approval_storage: ApprovalStorage | None = None,
) -> AsyncIterator[ResponseStreamEvent]:
"""Convert messages to output events with hosted-MCP call/result coalescing.
Parse once in message/content order and emit either:
- a single canonical completed ``mcp_call`` when adjacent hosted MCP
call/result content are encountered, or
- standard output items for all other content types.
"""
pending_mcp_call: Content | None = None
for message in messages:
for content in message.contents:
if pending_mcp_call is not None:
if content.type == "mcp_server_tool_result" and content.call_id == pending_mcp_call.call_id:
for event in _emit_completed_mcp_call(
stream,
pending_mcp_call,
arguments=_arguments_to_str(pending_mcp_call.arguments),
output=_stringify_mcp_output(content.output),
):
yield event
pending_mcp_call = None
continue
async for event in _to_outputs(stream, pending_mcp_call, approval_storage=approval_storage):
yield event
pending_mcp_call = None
if content.type == "mcp_server_tool_call" and content.call_id:
pending_mcp_call = content
continue
async for event in _to_outputs(stream, content, approval_storage=approval_storage):
yield event
if pending_mcp_call is not None:
async for event in _to_outputs(stream, pending_mcp_call, approval_storage=approval_storage):
yield event
# endregion
@@ -25,7 +25,7 @@ classifiers = [
dependencies = [
"agent-framework-core>=1.7.0,<2",
"azure-ai-agentserver-core>=2.0.0b3,<3",
"azure-ai-agentserver-responses>=1.0.0b5,<2",
"azure-ai-agentserver-responses>=1.0.0b7,<2",
"azure-ai-agentserver-invocations>=1.0.0b3,<2",
]
@@ -269,6 +269,50 @@ class TestNonStreaming:
assert "function_call_output" in types
assert "message" in types
async def test_hosted_mcp_call_and_result_persist_as_single_mcp_call(self) -> None:
agent = _make_agent(
response=AgentResponse(
messages=[
Message(
role="assistant",
contents=[
Content.from_mcp_server_tool_call(
call_id="mcp_abc123",
tool_name="search",
server_name="api_specs",
arguments='{"q": "cats"}',
)
],
),
Message(
role="tool",
contents=[
Content.from_mcp_server_tool_result(
call_id="mcp_abc123",
output=[Content.from_text(text="found 10 cats")],
)
],
),
Message(role="assistant", contents=[Content.from_text("I found 10 cats!")]),
]
)
)
server = _make_server(agent)
resp = await _post(server, stream=False)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
types = [item["type"] for item in body["output"]]
assert "mcp_call" in types
assert "custom_tool_call_output" not in types
mcp_items = [item for item in body["output"] if item["type"] == "mcp_call"]
assert len(mcp_items) == 1
assert mcp_items[0]["id"] == "mcp_abc123"
assert mcp_items[0]["output"] == "found 10 cats"
async def test_reasoning_content(self) -> None:
agent = _make_agent(
response=AgentResponse(
@@ -626,6 +670,53 @@ class TestStreaming:
assert "response.output_item.added" in types
assert "response.output_item.done" in types
async def test_mcp_tool_call_and_result_streaming_emit_single_completed_mcp_call(self) -> None:
agent = _make_agent(
stream_updates=[
AgentResponseUpdate(
contents=[
Content.from_mcp_server_tool_call(
call_id="mcp_abc123",
tool_name="search",
server_name="api_specs",
arguments='{"q":',
)
],
role="assistant",
),
AgentResponseUpdate(
contents=[
Content.from_mcp_server_tool_call(
call_id="mcp_abc123",
tool_name="search",
server_name="api_specs",
arguments=' "cats"}',
)
],
role="assistant",
),
AgentResponseUpdate(
contents=[
Content.from_mcp_server_tool_result(
call_id="mcp_abc123",
output=[Content.from_text(text="found 10 cats")],
)
],
role="tool",
),
]
)
server = _make_server(agent)
resp = await _post(server, stream=True)
assert resp.status_code == 200
events = _parse_sse_events(resp.text)
done_events = [e for e in events if e["event"] == "response.output_item.done"]
assert len(done_events) == 1
assert done_events[0]["data"]["item"]["type"] == "mcp_call"
assert done_events[0]["data"]["item"]["id"] == "mcp_abc123"
assert done_events[0]["data"]["item"]["output"] == "found 10 cats"
# endregion
@@ -729,6 +820,24 @@ class TestOutputItemToMessage:
assert msg.contents[0].server_name == "my_server"
assert msg.contents[0].tool_name == "search"
async def test_mcp_call_with_output_reconstructs_mcp_result_content(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemMcpToolCall
item = OutputItemMcpToolCall({
"type": "mcp_call",
"id": "mcp-1",
"server_label": "my_server",
"name": "search",
"arguments": '{"q": "test"}',
"output": "found 10 cats",
})
msg = await _output_item_to_message(item)
assert msg.role == "assistant"
assert len(msg.contents) == 2
assert msg.contents[0].type == "mcp_server_tool_call"
assert msg.contents[1].type == "mcp_server_tool_result"
assert msg.contents[1].output == "found 10 cats"
async def test_mcp_approval_request(self) -> None:
from azure.ai.agentserver.responses.models import OutputItemMcpApprovalRequest
@@ -1198,6 +1307,25 @@ class TestItemToMessage:
assert msg.contents[0].server_name == "my_server"
assert msg.contents[0].tool_name == "search"
async def test_mcp_call_with_output_reconstructs_mcp_result_content(self) -> None:
from azure.ai.agentserver.responses.models import ItemMcpToolCall
item = ItemMcpToolCall({
"type": "mcp_call",
"id": "mcp-1",
"server_label": "my_server",
"name": "search",
"arguments": '{"q": "test"}',
"output": "found 10 cats",
})
msg = await _item_to_message(item)
assert msg is not None
assert msg.role == "assistant"
assert len(msg.contents) == 2
assert msg.contents[0].type == "mcp_server_tool_call"
assert msg.contents[1].type == "mcp_server_tool_result"
assert msg.contents[1].output == "found 10 cats"
async def test_mcp_approval_request(self) -> None:
from azure.ai.agentserver.responses.models import ItemMcpApprovalRequest
@@ -1946,6 +2074,71 @@ class TestMultiTurnMixedContent:
assert len(fc_contents) >= 1
assert fc_contents[0].name == "search"
async def test_hosted_mcp_call_round_trip_does_not_orphan_function_call_output(self) -> None:
"""Turn 1 produces hosted MCP call + result, turn 2 must replay both without orphaning output."""
agent = _make_multi_response_agent([
AgentResponse(
messages=[
Message(
role="assistant",
contents=[
Content.from_mcp_server_tool_call(
call_id="mcp_abc123",
tool_name="search",
server_name="api_specs",
arguments='{"q": "cats"}',
)
],
),
Message(
role="tool",
contents=[
Content.from_mcp_server_tool_result(
call_id="mcp_abc123",
output=[Content.from_text(text="found 10 cats")],
)
],
),
Message(role="assistant", contents=[Content.from_text("I found 10 cats!")]),
]
),
AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("Here are more details")])]),
])
server = _make_server(agent)
resp1 = await _post(server, input_text="Search for cats", stream=False)
assert resp1.status_code == 200
response_id = resp1.json()["id"]
types1 = [item["type"] for item in resp1.json()["output"]]
assert "mcp_call" in types1
assert "custom_tool_call_output" not in types1
resp2 = await _post_json(
server,
{
"model": "test-model",
"input": "Tell me more",
"stream": False,
"previous_response_id": response_id,
},
)
assert resp2.status_code == 200
assert resp2.json()["status"] == "completed"
second_call_messages = agent.run.call_args_list[1].kwargs["messages"]
mcp_call_contents = [c for m in second_call_messages for c in m.contents if c.type == "mcp_server_tool_call"]
mcp_result_contents = [
c for m in second_call_messages for c in m.contents if c.type == "mcp_server_tool_result"
]
function_result_contents = [c for m in second_call_messages for c in m.contents if c.type == "function_result"]
assert len(mcp_call_contents) >= 1
assert len(mcp_result_contents) >= 1
assert all((c.call_id or "") != "mcp_abc123" for c in function_result_contents)
assert any((c.call_id or "") == "mcp_abc123" for c in mcp_call_contents)
assert any((c.call_id or "") == "mcp_abc123" for c in mcp_result_contents)
async def test_multi_turn_reasoning_in_history(self) -> None:
"""Turn 1 produces reasoning + text, turn 2 sees them in history."""
agent = _make_multi_response_agent([
@@ -64,7 +64,6 @@ actions:
### Agent Invocation
- `InvokeAzureAgent` - Call an Azure AI agent
- `InvokePromptAgent` - Call a local prompt agent
### Tool Invocation
- `InvokeFunctionTool` - Call a registered Python function
+4345 -4350
View File
File diff suppressed because it is too large Load Diff