Python: refactor FoundryHostedAgentHistoryProvider onto Foundry SDK (#5637)

* refactor(foundry_hosting): build FoundryHostedAgentHistoryProvider on azure.ai.agentserver SDK

Rebuilds the Foundry hosted-agent history provider on top of
``azure.ai.agentserver``'s ``FoundryStorageProvider`` instead of the
in-house ``_HttpStorageBackend``. Splits the monolithic ``_responses.py``
into focused modules:

- ``_history_provider.py`` — new ``FoundryHostedAgentHistoryProvider``
  that talks to the SDK's ``FoundryStorageProvider``, threads
  ``response_id`` / ``previous_response_id`` through ``ContextVar``s via
  ``bind_request_context``, and lifts host-bound isolation keys
  (``x-agent-{user,chat}-isolation-key``) from the optional
  ``agent_framework_hosting`` package into a provider-local
  ``IsolationContext`` so the storage layer carries the correct
  partition keys without channels having to know about them.
- ``_shared.py`` — extracts all SDK ``Item`` / ``OutputItem`` ↔
  framework ``Message`` conversion helpers into one place so both
  ``_responses.py`` and the new history provider can share them.
  Restores ``_convert_file_data`` for inline ``input_file`` payloads,
  and the hosted-MCP routing for ``custom_tool_call_output`` items
  whose ``call_id`` carries the ``mcp_*`` prefix.
- ``_ids.py`` — shared id helpers.
- ``_responses.py`` — shrinks ~700 lines, re-exports converters for
  back-compat with existing tests.
- ``tests/test_history_provider.py`` — exercises the new provider
  against a fake SDK backend; the host-isolation test is gated on the
  optional ``agent_framework_hosting`` import.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* feat(foundry_hosting): add local_storage_root for file-based dev history

Adds an optional `local_storage_root: str | Path | None` parameter to
`FoundryHostedAgentHistoryProvider`. When set and the provider is
running outside a Foundry Hosted Agent container, conversations are
persisted to JSONL files via `agent_framework.FileHistoryProvider`
laid out as:

  {root}/{user_key or '~none'}/{chat_key or '~none'}/{session_id}.jsonl

Hosted mode (FOUNDRY_HOSTING_ENVIRONMENT set) ignores the option with a
one-time INFO log so Foundry storage always wins on the platform. The
in-memory fallback is unchanged when the option is omitted.

Path safety: isolation segments are validated against the same character
allowlist FileHistoryProvider uses for session-id stems and
base64-url-encoded with a reserved "~iso-" prefix when unsafe. "~none"
sentinel for missing keys can never collide with a real isolation key
(real keys starting with "~" are encoded). The resolved target dir is
also re-checked to be inside the configured root.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix(foundry_hosting): address PR-1 review comments

- _shared.py:_capture_raw narrows `except Exception` to `except TypeError`
  and emits a WARNING with traceback so the lossy fallback to a
  synthesized round-trip is observable. Mirrors the reviewer suggestion.

- _history_provider.py:save_messages narrows `except Exception` to
  `except FoundryStorageError` so only storage-validation failures
  (4xx/5xx, opaque server errors) are swallowed. Network / TLS / auth
  / payload-builder bugs propagate so the caller can retry / alert.
  Adds an instance-level `failed_writes` counter operators can poll
  for silent-drop visibility.

- _history_provider.py id-stamping loop: drops the
  `contextlib.suppress(AttributeError, TypeError)` around
  `item.id = new_id` so SDK contract changes surface in the test
  suite instead of silently corrupting the chain (the storage backend
  rejects the entire `create_response` with HTTP 500 when synthetic
  prefix-based ids leak through). `import contextlib` removed.

- tests:
  * Unit-cover `foundry_response_id` / `foundry_response_id_factory` /
    `foundry_item_id` so SDK `IdGenerator` contract changes are caught
    locally.
  * Cover the `save_messages` wire payload: required-by-storage fields
    (`background`, `parallel_tool_calls`, `instructions`,
    `agent_reference`), env-var-driven stamping (`FOUNDRY_AGENT_NAME` /
    `FOUNDRY_AGENT_VERSION` / `FOUNDRY_AGENT_SESSION_ID` /
    `MODEL_DEPLOYMENT_NAME` with `AZURE_AI_MODEL_DEPLOYMENT_NAME`
    fallback), and the rule that `model` / `agent_session_id` /
    `agent_reference.version` are omitted (not stamped to `None`) when
    their env vars are unset.
  * Cover the `FOUNDRY_AGENT_SESSION_ID` last-resort chain anchor on
    both the get and save paths, including the prefix gate that blocks
    non-`caresp_*`/`resp_*` values from reaching storage, and the
    precedence rule that a host binding wins over the env.
  * Replace the old `test_save_messages_swallows_backend_errors` with
    two tests asserting the new contract: storage errors are swallowed
    and bump `failed_writes`; everything else propagates and leaves the
    counter at zero.

141 unit tests pass; mypy + pyright + ruff clean.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix(foundry_hosting): address PR-1 round-2 review comments

- Hosted detection now delegates to AgentConfig.from_env().is_hosted so
  a future Foundry SDK rename of FOUNDRY_HOSTING_ENVIRONMENT propagates
  automatically; drop the local _ENV_FOUNDRY_HOSTING_ENVIRONMENT
  constant.
- Drop the FOUNDRY_AGENT_SESSION_ID fallback in both get_messages and
  save_messages: per the SDK it identifies the *container instance*,
  not the conversation, so chaining off it would silently merge
  unrelated conversations across container restarts. The host-bound
  previous_response_id (set by ResponsesChannel) is the only
  authoritative anchor; the env value is still stamped into the
  persisted envelope's agent_session_id for operator correlation.
- Update module docstring + replace TestFoundryAgentSessionIdAnchor
  with assertions for the new contract (env var ignored as anchor,
  still stamped onto persisted envelope, host binding wins).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* refactor(foundry_hosting): reconcile with upstream main (#5851, #5666)

Brings the FoundryHostedAgentHistoryProvider refactor branch back into
sync with the foundry_hosting changes that have landed on upstream
main since PR-1 was opened:

* #5851 (path traversal in checkpoint storage, CWE-22).
  The workflow-host code in ``_responses.py`` builds a
  ``FileCheckpointStorage`` from a caller-controlled ``context_id``
  (``previous_response_id`` / ``conversation_id`` / ``response_id``).
  Switch both call sites to route through
  ``_checkpoint_storage_for_context``, which rejects separators,
  NUL bytes, drive letters, absolute paths, and all-dot segments,
  and enforces ``is_relative_to(root)`` before any directory is
  created.

* #5666 (function approval flow).
  Make the SDK-Item → AF-Message conversion helpers in ``_shared.py``
  async and accept an optional ``approval_storage`` keyword:

  - ``_items_to_messages`` / ``_item_to_message`` /
    ``_item_to_message_inner``
  - ``_output_items_to_messages`` / ``_output_item_to_message`` /
    ``_output_item_to_message_inner``

  For ``mcp_approval_request`` / ``mcp_approval_response`` items the
  helpers now load the original function-call Content from the
  approval storage (via ``ApprovalStorage.load_approval_request``)
  instead of synthesising a placeholder. This matches upstream
  semantics and lets approval round-trips reconstruct the real
  payload.

  The ``ApprovalStorage`` Protocol moves to ``_shared.py`` so the
  conversion helpers can reference it without pulling in
  ``_responses.py`` (which would create a circular import). The
  concrete ``InMemoryFunctionApprovalStorage`` and
  ``FileBasedFunctionApprovalStorage`` stay in ``_responses.py``
  next to the host that owns them, and re-export
  ``ApprovalStorage`` from ``_shared`` for compatibility.

  The workflow-host streaming path passes its own
  ``self._approval_storage`` into ``_to_outputs`` so approval
  requests are saved at emit time.

* Bump ``_history_provider.FoundryHostedAgentHistoryProvider.get_messages``
  to ``await`` the now-async ``_output_items_to_messages`` call.

No public API change beyond the new keyword-only ``approval_storage``
parameter on the four conversion entry points.

Validation:
- uv run poe check-packages -P foundry_hosting (lint + pyright clean)
- uv run poe mypy -P foundry_hosting (clean)
- uv run poe test -P foundry_hosting (183 passed, 1 skipped)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Eduard van Valkenburg
2026-05-22 15:42:06 +02:00
committed by GitHub
Unverified
parent 0cb9b52a4b
commit 4c317eb7cf
8 changed files with 3922 additions and 783 deletions
@@ -2,6 +2,16 @@
import importlib.metadata
from ._history_provider import (
FoundryHostedAgentHistoryProvider,
bind_request_context,
get_current_request_context,
)
from ._ids import (
foundry_item_id,
foundry_response_id,
foundry_response_id_factory,
)
from ._invocations import InvocationsHostServer
from ._responses import ResponsesHostServer
@@ -10,4 +20,13 @@ try:
except importlib.metadata.PackageNotFoundError:
__version__ = "0.0.0"
__all__ = ["InvocationsHostServer", "ResponsesHostServer"]
__all__ = [
"FoundryHostedAgentHistoryProvider",
"InvocationsHostServer",
"ResponsesHostServer",
"bind_request_context",
"foundry_item_id",
"foundry_response_id",
"foundry_response_id_factory",
"get_current_request_context",
]
@@ -0,0 +1,991 @@
# Copyright (c) Microsoft. All rights reserved.
"""Foundry Hosted Agent history provider.
A standalone :class:`agent_framework.HistoryProvider` implementation that
sources conversation history from the Foundry Hosted Agent storage backend.
Transport is delegated to the SDK's
:class:`azure.ai.agentserver.responses.FoundryStorageProvider` (when running
inside a Foundry Hosted Agent container) or
:class:`azure.ai.agentserver.responses.InMemoryResponseProvider` (for local
development). Both implement the same read/write surface
(``get_history_item_ids`` / ``get_items`` / ``create_response``), so this
provider's persistence logic stays backend-agnostic.
Allowed dependencies (deliberately narrow):
* :mod:`agent_framework` (core, for ``HistoryProvider`` / ``Message``)
* :mod:`azure.ai.agentserver.responses` (for the storage backends,
``IsolationContext`` typing, and ``OutputItem`` deserialization)
* :mod:`azure.core.credentials_async` (typing of token credentials)
It MUST NOT depend on any ``agent_framework_hosting*`` package at module
import time. (The host's isolation contextvar is consulted lazily via an
``import`` inside :func:`_host_isolation` so the dependency stays soft.)
Environment variables read:
* ``FOUNDRY_HOSTING_ENVIRONMENT`` — non-empty marks "running inside Foundry"
and selects the SDK-backed storage transport. Detection is delegated to
:class:`azure.ai.agentserver.core.AgentConfig` so a future SDK rename
propagates without touching this module.
* ``FOUNDRY_PROJECT_ENDPOINT`` — base URL of the Foundry project; required
when running hosted unless an explicit ``endpoint=`` is supplied.
* ``FOUNDRY_AGENT_NAME`` / ``FOUNDRY_AGENT_VERSION`` — stamped onto the
``agent_reference`` field of every persisted response envelope.
* ``MODEL_DEPLOYMENT_NAME`` / ``AZURE_AI_MODEL_DEPLOYMENT_NAME`` — model
field stamped on the persisted envelope (must match a real deployment).
Note on ``FOUNDRY_AGENT_SESSION_ID``: this env var identifies the
*container instance*, not the conversation, so it is **not** consulted as
a fallback ``previous_response_id``. The host-bound
``previous_response_id`` (set by :class:`ResponsesChannel` from the
request envelope) is the authoritative anchor. The value is still
persisted into the ``agent_session_id`` envelope field for operator
correlation only.
Local fallback: when ``FOUNDRY_HOSTING_ENVIRONMENT`` is unset, the provider
transparently falls back to :class:`InMemoryResponseProvider` so the same
agent code runs in dev. Pass ``local_storage_root`` to use a persistent
file-based store instead of in-memory; histories are then laid out as
``{root}/{user_key or "~none"}/{chat_key or "~none"}/{session_id}.jsonl``
via :class:`agent_framework.FileHistoryProvider`.
"""
from __future__ import annotations
import logging
import os
import time
from base64 import urlsafe_b64encode
from contextlib import contextmanager
from contextvars import ContextVar
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar
from agent_framework import FileHistoryProvider, HistoryProvider, Message
from azure.ai.agentserver.core import AgentConfig
from azure.ai.agentserver.responses import (
FoundryStorageProvider,
FoundryStorageSettings,
InMemoryResponseProvider,
IsolationContext,
)
from azure.ai.agentserver.responses._id_generator import IdGenerator
from azure.ai.agentserver.responses.models import OutputItem, ResponseObject
from azure.ai.agentserver.responses.store._foundry_errors import ( # pyright: ignore[reportPrivateUsage]
FoundryBadRequestError,
FoundryResourceNotFoundError,
FoundryStorageError,
)
from ._shared import (
_messages_to_output_items, # pyright: ignore[reportPrivateUsage]
_output_items_to_messages, # pyright: ignore[reportPrivateUsage]
)
if TYPE_CHECKING:
from collections.abc import Iterator, Sequence
from azure.core.credentials_async import AsyncTokenCredential
logger = logging.getLogger(__name__)
# Environment variable name — re-declared (not imported) so this module
# stays decoupled from the private ``azure.ai.agentserver.core._config``
# constants while still matching exactly. Hosted-vs-local detection is
# delegated to :class:`AgentConfig` so a future SDK rename propagates.
_ENV_FOUNDRY_PROJECT_ENDPOINT = "FOUNDRY_PROJECT_ENDPOINT"
# Per-request isolation context. The owning Channel is expected to set this
# from the inbound request (e.g. user / tenant headers) for the duration of
# an ``agent.run(...)`` call. When unset, requests are made without
# isolation headers (matches how ``ResponseContext`` behaves with no
# ``IsolationContext``).
_isolation_var: ContextVar[IsolationContext | None] = ContextVar(
"agent_framework_foundry_hosting_isolation",
default=None,
)
def set_current_isolation(isolation: IsolationContext | None) -> Any:
"""Set the per-request isolation context for downstream history calls.
Channels that drive an agent backed by :class:`FoundryHostedAgentHistoryProvider`
should call this before invoking ``agent.run(...)`` and reset the token
afterwards.
Args:
isolation: The isolation context to associate with the current
``contextvars`` context, or ``None`` to clear it.
Returns:
A token suitable for :func:`reset_current_isolation` that restores
the previous value.
"""
return _isolation_var.set(isolation)
def reset_current_isolation(token: Any) -> None:
"""Restore a previously-saved isolation context.
Args:
token: A token returned by :func:`set_current_isolation`.
"""
_isolation_var.reset(token)
def get_current_isolation() -> IsolationContext | None:
"""Return the isolation context bound to the current async context, if any.
Returns:
The :class:`IsolationContext` for the current request, or ``None``
when no channel has set one.
"""
return _isolation_var.get()
@dataclass(frozen=True)
class _RequestContext:
"""Per-request anchors the host binds before invoking the agent.
``response_id`` is the id this provider's :meth:`save_messages` call
will write under, so the channel and the storage backend agree on
one stable handle per turn (the channel surfaces the same id on the
response envelope, the next turn arrives with this value as
``previous_response_id`` and the chain walks).
``previous_response_id`` is the prior turn's anchor (``None`` on
first turn). Used to seed ``history_item_ids`` on the new write so
the storage chain stays connected, and to load history without
needing to know the channel's session minting convention.
Per-request Foundry isolation keys (the
``x-agent-{user,chat}-isolation-key`` headers) are *not* carried
here; the host's own ASGI middleware lifts them off every inbound
HTTP request into a contextvar
(:func:`agent_framework_hosting.get_current_isolation_keys`) which
this provider consults at storage-call time. Keeping the headers
out of the per-request bind means channels never have to import
Foundry-specific types and the host owns the (intentional) coupling
to those two well-known headers.
"""
response_id: str
previous_response_id: str | None
_request_var: ContextVar[_RequestContext | None] = ContextVar(
"agent_framework_foundry_hosting_request",
default=None,
)
@contextmanager
def bind_request_context(
*,
response_id: str,
previous_response_id: str | None = None,
**_unused: Any,
) -> Iterator[None]:
"""Bind the per-request response-chain anchors for this provider.
Intended for the host (or any caller orchestrating an
``agent.run(...)``) to call immediately before invocation, so the
provider's :meth:`save_messages` writes under a known, stable
``response_id`` (the same one the channel surfaces to the client)
and walks ``previous_response_id`` for history continuity. Unknown
keyword arguments are accepted and ignored so the host can extend
the ``ChannelRequest.attributes`` contract without breaking existing
providers. Foundry isolation keys flow through a separate
host-installed contextvar; see the class docstring on
:class:`_RequestContext`.
The binding is scoped to the current ``contextvars.Context``, so
concurrent requests in the same process do not interfere.
"""
token = _request_var.set(
_RequestContext(
response_id=response_id,
previous_response_id=previous_response_id,
)
)
try:
yield
finally:
_request_var.reset(token)
def get_current_request_context() -> _RequestContext | None:
"""Return the per-request response chain anchors, if bound."""
return _request_var.get()
def _host_isolation() -> IsolationContext | None:
"""Lift the host-bound isolation contextvar into our local type.
The host installs an ASGI middleware that reads
``x-agent-{user,chat}-isolation-key`` off every inbound HTTP request
and stores them in a generic ``IsolationKeys`` slot on a contextvar
we import from :mod:`agent_framework_hosting`. We translate it into
our :class:`IsolationContext` shape on demand so the provider stays
in charge of the storage-side type while the host stays free of any
Foundry-specific dependencies.
"""
# Soft dep: ``agent_framework_hosting`` may not be installed (this
# provider is also usable standalone). The whole block is wrapped in
# ``# pyright: ignore`` so the optional import does not block type
# checking when the package isn't on sys.path; when it is, pyright
# picks up the real types automatically.
try:
from agent_framework_hosting import ( # pyright: ignore[reportMissingImports]
get_current_isolation_keys, # pyright: ignore[reportUnknownVariableType]
)
except ImportError: # pragma: no cover - hosting is a soft dep
return None
keys = get_current_isolation_keys() # pyright: ignore[reportUnknownVariableType]
if keys is None or keys.is_empty: # pyright: ignore[reportUnknownMemberType]
return None
return IsolationContext(
user_key=keys.user_key, # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType]
chat_key=keys.chat_key, # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType]
)
# Type alias for the storage backend surface this provider depends on.
# Both ``FoundryStorageProvider`` and ``InMemoryResponseProvider`` from
# ``azure.ai.agentserver.responses`` expose the same
# ``get_history_item_ids`` / ``get_items`` / ``create_response`` methods.
_StorageBackend = "FoundryStorageProvider | InMemoryResponseProvider"
# Sentinel directory name used in place of a missing ``user_key`` /
# ``chat_key`` when laying out file-based local history. The tilde
# prefix is reserved (``_is_safe_isolation_segment`` rejects keys that
# start with one) so a real isolation key can never collide with the
# sentinel after sanitisation.
_ISOLATION_NONE_MARKER = "~none"
_ISOLATION_ENCODED_PREFIX = "~iso-"
# Windows reserved file/directory stems. Mirrors
# ``FileHistoryProvider._WINDOWS_RESERVED_FILE_STEMS`` so the directory
# layer enforces the same portability constraints the file layer does.
_WINDOWS_RESERVED_STEMS = frozenset({
"CON",
"PRN",
"AUX",
"NUL",
*(f"COM{i}" for i in range(1, 10)),
*(f"LPT{i}" for i in range(1, 10)),
})
def _is_safe_isolation_segment(value: str) -> bool:
"""Return whether ``value`` is safe to use directly as a directory name.
Rules mirror :meth:`FileHistoryProvider._is_literal_session_file_stem_safe`,
with the additional rule that a leading tilde is reserved for our
sentinel/encoded prefixes so real keys can never collide with them.
"""
if (
not value
or value.startswith((".", "~"))
or value.endswith((" ", "."))
or value.upper() in _WINDOWS_RESERVED_STEMS
):
return False
if any(ord(character) < 32 for character in value):
return False
return all(character.isalnum() or character in "._-" for character in value)
def _encode_isolation_segment(value: str | None) -> str:
"""Encode an isolation key into a filesystem-safe directory name.
* ``None`` / empty → ``"~none"`` sentinel.
* Already-safe values pass through unchanged.
* Anything else is base64-url-encoded and prefixed with ``"~iso-"``
so it is unambiguous and never collides with a real (safe) key.
"""
if value is None or value == "":
return _ISOLATION_NONE_MARKER
if _is_safe_isolation_segment(value):
return value
encoded = urlsafe_b64encode(value.encode("utf-8")).decode("ascii").rstrip("=")
return f"{_ISOLATION_ENCODED_PREFIX}{encoded}"
class FoundryHostedAgentHistoryProvider(HistoryProvider):
"""``HistoryProvider`` backed by Foundry Hosted Agent storage.
Wraps :class:`azure.ai.agentserver.responses.FoundryStorageProvider`
when running inside a Foundry Hosted Agent container, or
:class:`InMemoryResponseProvider` for local development. The
selection is driven by the ``FOUNDRY_HOSTING_ENVIRONMENT``
environment variable.
For local runs that need to *persist* history across process
restarts, pass ``local_storage_root``: the provider then writes
each conversation to
``{root}/{user_key or "~none"}/{chat_key or "~none"}/{session_id}.jsonl``
via :class:`agent_framework.FileHistoryProvider`. The Foundry
response-chain semantics (``previous_response_id`` walking,
``caresp_*`` id stamping, ``ResponseObject`` envelopes) are
bypassed in file mode — the on-disk format is plain JSONL of
:class:`Message` payloads, identical to ``FileHistoryProvider``
standalone usage. ``local_storage_root`` is ignored when running
hosted (Foundry storage always wins).
``session_id`` semantics: in hosted / in-memory mode the value
passed to :meth:`get_messages` and :meth:`save_messages` is treated
as the Responses ``previous_response_id`` (or ``conversation_id``)
whose chain to load. When omitted (and no host-bound chain anchor
is set), :meth:`get_messages` returns an empty list (a fresh
conversation). In file mode ``session_id`` is used as the literal
filename stem (``FileHistoryProvider`` sanitises unsafe values).
"""
DEFAULT_SOURCE_ID: ClassVar[str] = "foundry_hosted_agent"
def __init__(
self,
*,
credential: AsyncTokenCredential | None = None,
endpoint: str | None = None,
history_limit: int = 100,
source_id: str = DEFAULT_SOURCE_ID,
load_messages: bool = True,
store_inputs: bool = True,
store_context_messages: bool = False,
store_context_from: set[str] | None = None,
store_outputs: bool = True,
local_storage_root: str | Path | None = None,
) -> None:
"""Initialize the provider.
Args:
credential: Async token credential used to authenticate against
the Foundry storage API. Required when running hosted
(``FOUNDRY_HOSTING_ENVIRONMENT`` is set). Ignored in
local-mode (the in-memory / file backends need no auth).
endpoint: Foundry project endpoint URL. Defaults to the value
of the ``FOUNDRY_PROJECT_ENDPOINT`` environment variable.
Required when running hosted.
history_limit: Maximum number of history items to fetch per
``get_messages`` call. Mirrors the agent-server runtime's
``ResponseContext._history_limit``. Default ``100``.
Ignored in file mode (``FileHistoryProvider`` returns the
full session file each call).
source_id: Unique identifier for this provider instance, as
required by ``HistoryProvider``.
load_messages: Whether to load messages before invocation.
Default ``True``.
store_inputs: Whether to mirror input messages into Foundry
storage. Default ``True`` — the Foundry Hosted Agents
runtime does not persist Responses turns automatically, so
without this the chain would never be visible to subsequent
requests. Set ``False`` only if you know an external writer
is populating storage on your behalf.
store_context_messages: Whether to mirror context-provider
messages. Default ``False``.
store_context_from: If set, only mirror context messages from
these source IDs.
store_outputs: Whether to mirror response messages into Foundry
storage. Default ``True`` for the same reason as
``store_inputs``.
local_storage_root: When set, *and* the provider is running
outside a Foundry Hosted Agent container, persist history
to JSONL files under
``{root}/{user_key or "~none"}/{chat_key or "~none"}/{session_id}.jsonl``
instead of using the in-memory backend. Ignored when
hosted (with a one-time INFO log). Defaults to ``None``
(in-memory local fallback).
"""
super().__init__(
source_id=source_id,
load_messages=load_messages,
store_inputs=store_inputs,
store_context_messages=store_context_messages,
store_context_from=store_context_from,
store_outputs=store_outputs,
)
self._history_limit = history_limit
self._credential = credential
self._endpoint = endpoint or os.environ.get(_ENV_FOUNDRY_PROJECT_ENDPOINT) or None
self._backend: FoundryStorageProvider | InMemoryResponseProvider | None = None
self._local_storage_root: Path | None = (
Path(local_storage_root).resolve() if local_storage_root is not None else None
)
# Cache one ``FileHistoryProvider`` per (user_key, chat_key)
# tuple. Bounded by the number of distinct isolation scopes the
# process sees; cleared on ``aclose``.
self._file_providers: dict[tuple[str, str], FileHistoryProvider] = {}
self._hosted_local_root_warned = False
if self._local_storage_root is not None and self.is_hosted_environment():
self._warn_hosted_local_root_ignored()
# Observability: number of ``save_messages`` calls dropped by
# :class:`FoundryStorageError` from ``backend.create_response``.
# Operators / health probes can read this attribute directly to
# detect silent persistence loss; never decremented.
self.failed_writes: int = 0
@staticmethod
def is_hosted_environment() -> bool:
"""Return ``True`` when running inside a Foundry Hosted Agent container.
Delegates to :meth:`azure.ai.agentserver.core.AgentConfig.from_env`
so the detection rule stays in lockstep with the Foundry SDK; if
the platform ever renames the underlying signal (today
``FOUNDRY_HOSTING_ENVIRONMENT``) the SDK update is picked up
automatically without a code change here.
"""
return AgentConfig.from_env().is_hosted
def _resolve_backend(self) -> FoundryStorageProvider | InMemoryResponseProvider:
"""Return the storage backend, constructing it lazily on first use.
* If ``FOUNDRY_HOSTING_ENVIRONMENT`` is set, build a
:class:`FoundryStorageProvider` (requires ``credential`` and a
resolved ``endpoint``).
* Otherwise, fall back to a process-local
:class:`InMemoryResponseProvider` so dev/local runs work without
additional configuration.
"""
if self._backend is not None:
return self._backend
if self.is_hosted_environment():
if self._credential is None:
raise RuntimeError(
"FoundryHostedAgentHistoryProvider requires an async credential when running "
"inside a Foundry Hosted Agent container. Pass credential=... ."
)
if not self._endpoint:
raise RuntimeError(
"FoundryHostedAgentHistoryProvider needs a Foundry project endpoint. Pass "
"endpoint=... or set the FOUNDRY_PROJECT_ENDPOINT environment variable."
)
self._backend = FoundryStorageProvider(
credential=self._credential,
settings=FoundryStorageSettings.from_endpoint(self._endpoint),
)
logger.debug(
"FoundryHostedAgentHistoryProvider using FoundryStorageProvider against %s",
self._endpoint,
)
return self._backend
logger.info(
"FOUNDRY_HOSTING_ENVIRONMENT is unset — FoundryHostedAgentHistoryProvider falling "
"back to InMemoryResponseProvider for local development.",
)
self._backend = InMemoryResponseProvider()
return self._backend
async def aclose(self) -> None:
"""Release storage resources held by this provider.
Safe to call multiple times. Closes the lazily-constructed
backend if one was created and drops any cached file-history
providers. ``InMemoryResponseProvider`` and
``FileHistoryProvider`` have no ``aclose`` and are closed
implicitly on garbage collection.
"""
self._file_providers.clear()
if self._backend is None:
return
aclose = getattr(self._backend, "aclose", None)
if aclose is not None:
await aclose()
self._backend = None
def _warn_hosted_local_root_ignored(self) -> None:
"""Log (once) that ``local_storage_root`` is being ignored under hosted mode."""
if self._hosted_local_root_warned:
return
self._hosted_local_root_warned = True
logger.info(
"FoundryHostedAgentHistoryProvider ignored local_storage_root=%s because "
"FOUNDRY_HOSTING_ENVIRONMENT is set; Foundry storage takes precedence "
"when hosted.",
self._local_storage_root,
)
def _resolve_local_file_provider(
self,
isolation: IsolationContext | None,
) -> FileHistoryProvider | None:
"""Return a ``FileHistoryProvider`` for the current isolation, or ``None``.
Returns ``None`` when ``local_storage_root`` is unset *or* the
provider is running in hosted mode (in which case Foundry
storage handles persistence). Otherwise builds — and caches —
one provider per (user_key, chat_key) tuple, rooted at the
sanitised ``{root}/{user_segment}/{chat_segment}`` directory.
Raises:
ValueError: If the resolved isolation directory escapes
``local_storage_root`` (defence in depth — the
sanitisation should already prevent this).
"""
if self._local_storage_root is None:
return None
if self.is_hosted_environment():
self._warn_hosted_local_root_ignored()
return None
user_key = isolation.user_key if isolation is not None else None
chat_key = isolation.chat_key if isolation is not None else None
cache_key = (user_key or "", chat_key or "")
cached = self._file_providers.get(cache_key)
if cached is not None:
return cached
user_segment = _encode_isolation_segment(user_key)
chat_segment = _encode_isolation_segment(chat_key)
target_dir = (self._local_storage_root / user_segment / chat_segment).resolve()
if not target_dir.is_relative_to(self._local_storage_root):
raise ValueError(
"Isolation segments resolved outside of local_storage_root: "
f"user_key={user_key!r} chat_key={chat_key!r}"
)
provider = FileHistoryProvider(
target_dir,
source_id=f"{self.source_id}__file__{user_segment}__{chat_segment}",
load_messages=self.load_messages,
store_inputs=self.store_inputs,
store_context_messages=self.store_context_messages,
store_context_from=self.store_context_from,
store_outputs=self.store_outputs,
)
self._file_providers[cache_key] = provider
logger.debug(
"FoundryHostedAgentHistoryProvider created file backend for isolation (user=%s, chat=%s) at %s",
user_key,
chat_key,
target_dir,
)
return provider
async def get_messages(
self,
session_id: str | None,
*,
state: dict[str, Any] | None = None,
**kwargs: Any,
) -> list[Message]:
"""Load conversation history for the given Foundry response chain.
Args:
session_id: The Responses ``previous_response_id`` /
``conversation_id`` to anchor history on. When ``None`` /
empty, an empty history is returned (fresh conversation).
state: Unused — kept for ``HistoryProvider`` compatibility.
**kwargs: Extensibility hook; ``isolation`` may be supplied
explicitly to override the contextvar.
Returns:
The conversation history materialised as a list of
:class:`agent_framework.Message`, oldest-first.
Notes:
History anchoring follows the Foundry response-id chain. The
preferred anchor is the per-request ``previous_response_id``
bound by the host via :func:`bind_request_context` — that's
the prior turn's resp id, written by *this* provider's
previous :meth:`save_messages` call, so the chain is
guaranteed walkable. When unbound (e.g. local dev calling
the provider directly), we fall back to the ``session_id``
argument as long as it's ``resp_*``-shaped; opaque tokens
(such as chat-isolation-key values) are skipped because the
storage backend rejects them with HTTP 400 "Malformed
identifier".
When ``local_storage_root`` is configured (and the provider
is running outside a Foundry Hosted Agent container), this
method instead delegates to a per-isolation
:class:`FileHistoryProvider` and ``session_id`` is used as
the literal file stem.
"""
isolation = kwargs.get("isolation") or _host_isolation() or get_current_isolation()
file_provider = self._resolve_local_file_provider(isolation)
if file_provider is not None:
return await file_provider.get_messages(session_id, state=state, **kwargs)
bound = get_current_request_context()
# Prefer the host-bound previous_response_id over the session_id
# the framework feeds in: the bound value is the id we ourselves
# wrote on the previous turn, so we know it's storage-valid.
anchor = bound.previous_response_id if bound is not None else None
if anchor is None and session_id and session_id.startswith(("caresp_", "resp_")):
anchor = session_id
if anchor is None:
# No walkable anchor → fresh conversation, nothing to load.
# Note: we intentionally do NOT fall back to
# ``FOUNDRY_AGENT_SESSION_ID`` — per the Foundry SDK that env
# var identifies the *container instance*, not the
# conversation, so it doesn't yield a walkable response-id
# chain. The host-bound ``previous_response_id`` (set by
# ``ResponsesChannel`` from the request envelope) is the
# authoritative anchor.
return []
backend = self._resolve_backend()
try:
item_ids = await backend.get_history_item_ids(
anchor,
None,
self._history_limit,
isolation=isolation,
)
except (FoundryBadRequestError, FoundryResourceNotFoundError) as err:
# 400 / 404 here means the anchor isn't storage-valid — treat
# it as an empty history rather than failing the whole request.
logger.debug(
"get_messages: anchor %r rejected by storage (%s); returning empty history",
anchor,
type(err).__name__,
)
return []
if not item_ids:
return []
items = await backend.get_items(item_ids, isolation=isolation)
# ``get_items`` may return ``None`` placeholders for missing IDs.
resolved = [item for item in items if item is not None]
return await _output_items_to_messages(resolved)
async def save_messages(
self,
session_id: str | None,
messages: Sequence[Message],
*,
state: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
"""Persist messages for ``session_id`` into Foundry storage.
Unlike the standalone ``azure.ai.agentserver`` runtime — which
owns response orchestration end-to-end and writes turns
authoritatively — the Agent Framework hosting stack treats
``HistoryProvider`` as the *only* persistence path. Without this
method actively writing, a deployed hosted agent would silently
drop every turn.
Strategy:
* Use the host-bound ``response_id`` as the envelope id (mints
a fresh ``caresp_*`` id when unbound, e.g. local dev).
* Anchor the new write to the previous turn via
``previous_response_id``, walking the prior turn's history
item ids forward so the full transcript stays visible.
* Split items by role: ``"message"`` (user/system inputs) into
``input_items``, everything else (assistant outputs, tool
calls, reasoning, ...) into ``response.output``.
Args:
session_id: The Responses ``previous_response_id`` /
``conversation_id`` the messages belong to.
messages: The messages selected for persistence by the base
``HistoryProvider`` after-run hook.
state: Unused — kept for ``HistoryProvider`` compatibility.
**kwargs: Extensibility hook; ``isolation`` may be supplied
explicitly to override the contextvar.
Notes:
When ``local_storage_root`` is configured (and the provider
is running outside a Foundry Hosted Agent container), this
method instead delegates to a per-isolation
:class:`FileHistoryProvider` and ``session_id`` is used as
the literal file stem. The Foundry response-chain stamping
described above is bypassed entirely in that mode.
"""
if not messages:
return
isolation = kwargs.get("isolation") or _host_isolation() or get_current_isolation()
file_provider = self._resolve_local_file_provider(isolation)
if file_provider is not None:
await file_provider.save_messages(session_id, messages, state=state, **kwargs)
return
bound = get_current_request_context()
# Prefer the host-bound response_id so the channel envelope and
# the storage write agree on a single id per turn — which is
# what makes the next turn's ``previous_response_id`` walkable.
# Without a binding (e.g. local dev calling ``save_messages``
# directly), fall back to a fresh Foundry-format response id.
# Free-form ``resp_<uuid>`` ids carry no embedded partition key
# and the storage backend rejects writes with a server error;
# ``IdGenerator.new_response_id()`` mints a ``caresp_*`` id with
# the partition-key segment the backend expects. The chain
# walks only when ``session_id`` is itself a ``caresp_*``-shaped
# value (i.e. a previous response id), matching the prefix the
# ``ResponsesChannel`` factory uses.
if bound is not None:
response_id = bound.response_id
previous_response_id = bound.previous_response_id
else:
if not session_id:
return
response_id = IdGenerator.new_response_id()
previous_response_id = session_id if session_id.startswith(("caresp_", "resp_")) else None
# Note: we intentionally do NOT consult ``FOUNDRY_AGENT_SESSION_ID``
# as a fallback ``previous_response_id`` here. Per the Foundry SDK
# that env var identifies the *container instance*, not the
# conversation, so chaining off it produces an unwalkable history.
# The host-bound ``previous_response_id`` (set by
# ``ResponsesChannel`` from the request envelope) is the only
# authoritative anchor; if it's missing the new turn is the start
# of a fresh chain.
logger.debug(
"save_messages: response_id=%r previous_response_id=%r isolation=%s",
response_id,
previous_response_id,
"<set>" if isolation else "<None>",
)
backend = self._resolve_backend()
# The agentserver runtime puts INBOUND items (user/system messages
# the request sent in) in the envelope's ``input_items`` axis and
# OUTBOUND items (assistant outputs, tool calls, reasoning) in
# ``response.output``. See
# ``_resolve_input_items_for_persistence`` (orchestrator.py:61) +
# ``_extract_response_snapshot_from_events`` in
# ``azure.ai.agentserver.responses``: ``input_items`` comes from
# ``ctx.input_items`` (request inputs only); ``response.output``
# is populated from the lifecycle event stream.
#
# Putting everything in ``input_items`` with ``response.output: []``
# is a schema violation that the storage backend rejects with an
# opaque HTTP 500. Split by role to mirror the runtime.
all_items = _messages_to_output_items(list(messages), id_prefix=response_id)
# Re-stamp every item id via ``IdGenerator`` so each carries a
# Foundry-format ``{type-prefix}_<partitionKey><entropy>``
# identifier, with the response_id as the partition-key hint
# (co-locates each item with the response record). Free-form
# ``{response_id}_itm_N`` ids are rejected by the storage
# backend with an opaque HTTP 500 because the partition-key
# extractor cannot parse them. ``IdGenerator.new_item_id``
# dispatches by *Item* (input) type and returns ``None`` for
# our *OutputItem* (storage) instances, so we dispatch by the
# ``type`` discriminator string instead.
ITEM_ID_FACTORY: dict[str, Any] = {
"message": IdGenerator.new_message_item_id,
"output_message": IdGenerator.new_output_message_item_id,
"function_call": IdGenerator.new_function_call_item_id,
"function_call_output": IdGenerator.new_function_call_output_item_id,
"reasoning": IdGenerator.new_reasoning_item_id,
"file_search_call": IdGenerator.new_file_search_call_item_id,
"web_search_call": IdGenerator.new_web_search_call_item_id,
"image_generation_call": IdGenerator.new_image_gen_call_item_id,
"code_interpreter_call": IdGenerator.new_code_interpreter_call_item_id,
"computer_call": IdGenerator.new_computer_call_item_id,
"computer_call_output": IdGenerator.new_computer_call_output_item_id,
"local_shell_call": IdGenerator.new_local_shell_call_item_id,
"local_shell_call_output": IdGenerator.new_local_shell_call_output_item_id,
"mcp_call": IdGenerator.new_mcp_call_item_id,
"mcp_list_tools": IdGenerator.new_mcp_list_tools_item_id,
"mcp_approval_request": IdGenerator.new_mcp_approval_request_item_id,
"mcp_approval_response": IdGenerator.new_mcp_approval_response_item_id,
"custom_tool_call": IdGenerator.new_custom_tool_call_item_id,
"custom_tool_call_output": IdGenerator.new_custom_tool_call_output_item_id,
}
for item in all_items:
factory = ITEM_ID_FACTORY.get(getattr(item, "type", "") or "")
if factory is None:
continue
new_id = factory(response_id)
# Plain attribute assignment — the SDK ``OutputItem`` models
# are ``MutableMapping``s with ``__setattr__`` wired to dict
# set, so this is expected to succeed for every type listed
# above. The previous ``contextlib.suppress`` masked SDK
# contract changes (next save would silently retain the
# synthetic prefix-based id and the storage backend would
# reject the entire ``create_response`` with HTTP 500).
# Letting it raise surfaces those breakages to the test
# suite instead.
item.id = new_id # type: ignore[attr-defined]
input_items: list[Any] = []
output_items: list[Any] = []
for item in all_items:
item_type = getattr(item, "type", None)
if item_type == "message":
input_items.append(item)
else:
# ``output_message``, tool calls, reasoning, etc. all
# belong to the response output stream.
output_items.append(item)
# Walk the previous response's history chain so the new write
# carries the full transcript forward. Without this, each turn
# would only see the messages saved on that very turn.
history_item_ids: list[str] | None = None
if previous_response_id is not None:
try:
history_item_ids = await backend.get_history_item_ids(
previous_response_id,
None,
self._history_limit,
isolation=isolation,
)
except (FoundryBadRequestError, FoundryResourceNotFoundError) as err:
# Don't let history fetch failures torpedo the write —
# we still want to persist the new turn even if the
# chain seed is unreachable for some reason.
logger.warning(
"save_messages: failed to walk previous_response_id=%r (%s); writing new turn without history seed",
previous_response_id,
type(err).__name__,
)
# Mirror what the agentserver runtime serialises onto the wire
# (see ``_extract_response_snapshot_from_events`` +
# ``strip_nulls`` in
# ``azure.ai.agentserver.responses.streaming._helpers``):
#
# * ``agent_reference`` (Required on the response envelope) —
# built from ``FOUNDRY_AGENT_NAME`` / ``FOUNDRY_AGENT_VERSION``,
# which the hosted platform sets per-deploy (sentinel fallback
# for local dev so the envelope stays well-formed).
# * ``agent_session_id`` (S-038) — forcibly stamped by the
# runtime; sourced from ``FOUNDRY_AGENT_SESSION_ID``.
# * ``conversation`` is intentionally omitted: the (user, chat)
# isolation headers are the Foundry storage partition key,
# and the chat-isolation-key value is opaque (the API
# returns "Malformed identifier"/HTTP 400 if used as a
# body-level ``conversation_id``).
# * Per-item ``response_id`` / ``agent_reference`` are NOT
# stamped here — those B20/B21 defaults only apply to items
# inside ``response.output_item.added/done`` *events* (see
# ``_coerce_handler_event``); items inside ``input_items``
# and ``response.output`` go through ``to_output_item`` which
# never sets these fields, and the storage validator returns
# HTTP 400 ``invalid_payload`` when extras leak in.
agent_name = os.environ.get("FOUNDRY_AGENT_NAME") or "agent-framework-host"
agent_version = os.environ.get("FOUNDRY_AGENT_VERSION") or None
agent_reference: dict[str, Any] = {"type": "agent_reference", "name": agent_name}
if agent_version:
agent_reference["version"] = agent_version
agent_session_id = os.environ.get("FOUNDRY_AGENT_SESSION_ID") or None
# ``model`` must be a real deployed model name — the storage
# validator rejects arbitrary strings. Pull it from the
# platform-provided ``MODEL_DEPLOYMENT_NAME`` (set in agent.yaml)
# and fall back to ``AZURE_AI_MODEL_DEPLOYMENT_NAME`` for local
# dev. When neither is set we omit the field entirely (it is
# ``Optional[str]`` per the ResponseObject schema).
model_deployment = (
os.environ.get("MODEL_DEPLOYMENT_NAME") or os.environ.get("AZURE_AI_MODEL_DEPLOYMENT_NAME") or None
)
# Build the wire payload to match exactly what the agentserver
# runtime emits via ``_extract_response_snapshot_from_events``
# for a synthetic ``status=completed`` snapshot:
#
# {id, object, output, created_at, [model], agent_reference,
# status, completed_at, [agent_session_id]}
#
# ``previous_response_id`` is appended when chaining; the runtime
# threads it through the same code path.
now = int(time.time())
response_body: dict[str, Any] = {
"id": response_id,
# SDK mirror: ``streaming/_helpers.py:244`` always stamps
# ``response_id`` alongside ``id`` on the snapshot before it
# reaches ``serialize_create_request``.
"response_id": response_id,
"object": "response",
# S-040 auto-stamp: the orchestrator (``_orchestrator.py:1706``)
# echoes ``background`` from the request to every response
# envelope; storage rejects payloads that omit it.
"background": False,
# ``ResponseObject`` schema (``_models.py:13995``) declares
# ``parallel_tool_calls: bool`` as REQUIRED. The SDK's synthetic
# fallback path (``_build_events``) never sets it because it's
# only invoked for failure recovery; real handler events carry
# it through. Storage rejects payloads that omit it.
"parallel_tool_calls": False,
# Same story for ``instructions`` (``_models.py:13989``) —
# required ``str | list[Item]`` field.
"instructions": "",
"output": [item.as_dict() for item in output_items],
"created_at": now,
"agent_reference": agent_reference,
"status": "completed",
"completed_at": now,
}
if model_deployment is not None:
response_body["model"] = model_deployment
if agent_session_id is not None:
response_body["agent_session_id"] = agent_session_id
if previous_response_id is not None:
response_body["previous_response_id"] = previous_response_id
response = ResponseObject(response_body)
try:
await backend.create_response(
response,
input_items=input_items,
history_item_ids=history_item_ids,
isolation=isolation,
)
except FoundryStorageError as exc:
# Storage-validation failures (4xx ``invalid_payload`` /
# ``not_found``, opaque 5xx) are best-effort losses: the
# caller's run already produced output and we don't want to
# crash the whole turn over a chain-write the user can't
# recover from. They are still observable: every drop bumps
# ``failed_writes`` (operators can poll it / surface in
# health probes) and the full traceback + ``response_body``
# is logged.
#
# Network / TLS / DNS errors, expired-credential 401/403s,
# and bugs in the wire-payload builder above (e.g. a
# required-field regression) deliberately propagate so they
# surface to the caller and trigger retry / alerting paths
# instead of being silently dropped here.
self.failed_writes += 1
err_body = getattr(exc, "response_body", None)
logger.exception(
"FoundryHostedAgentHistoryProvider.save_messages: storage rejected "
"%d message(s) (response_id=%s, previous_response_id=%s, error_body=%s, "
"failed_writes=%d).",
len(messages),
response_id,
previous_response_id,
err_body,
self.failed_writes,
)
return
logger.debug(
"FoundryHostedAgentHistoryProvider.save_messages: persisted %d message(s) "
"(response_id=%s, previous_response_id=%s).",
len(messages),
response_id,
previous_response_id,
)
# Re-export ``OutputItem`` for callers that want to construct test items
# without reaching into the SDK's ``models`` namespace directly.
__all__ = [
"FoundryHostedAgentHistoryProvider",
"OutputItem",
"bind_request_context",
"get_current_isolation",
"get_current_request_context",
"reset_current_isolation",
"set_current_isolation",
]
@@ -0,0 +1,72 @@
# Copyright (c) Microsoft. All rights reserved.
"""Foundry-storage-compatible identifier helpers.
The Foundry hosted-agent storage backend partitions records by extracting
an embedded partition-key segment from every record/item id. The id
format is ``{prefix}_{18charPartitionKey}{32charEntropy}`` (or a 48-char
legacy body). Free-form ids such as ``resp_<uuid hex>`` carry no valid
partition key and the storage API rejects writes with an opaque
``HTTP 500 server_error``.
These helpers wrap :class:`azure.ai.agentserver.responses._id_generator.IdGenerator`
so callers (e.g. the ``ResponsesChannel.response_id_factory`` argument
or :class:`FoundryHostedAgentHistoryProvider.save_messages`) can mint
ids that the storage backend accepts without leaking the SDK import
path into user code.
"""
from __future__ import annotations
from typing import Any
from azure.ai.agentserver.responses._id_generator import IdGenerator
__all__ = [
"foundry_item_id",
"foundry_response_id",
"foundry_response_id_factory",
]
def foundry_response_id(previous_response_id: str | None = None) -> str:
"""Mint a Foundry-storage-compatible response id (``caresp_*``).
Args:
previous_response_id: When supplied (and shaped like a Foundry
id with an embedded partition key), the new id co-locates
with the chain by reusing that partition key. The storage
backend rejects chained writes whose new record sits in a
different partition than the prior one.
Returns:
A new id of the form ``caresp_<18charPartitionKey><32charEntropy>``.
"""
return IdGenerator.new_response_id(previous_response_id or "")
def foundry_response_id_factory() -> "Any":
"""Return a callable suitable for ``ResponsesChannel(response_id_factory=...)``.
The returned callable accepts an optional ``previous_response_id``
hint which the channel passes for chained turns so the new id
inherits the prior turn's partition key (Foundry storage requirement).
"""
return foundry_response_id
def foundry_item_id(item: "Any", response_id: str | None = None) -> str | None:
"""Mint a Foundry-storage-compatible item id for *item*.
Dispatches via :meth:`IdGenerator.new_item_id` so the id picks up
the right type prefix (``msg`` / ``om`` / ``fc`` / ``rs`` / ...).
When ``response_id`` is supplied it acts as a partition-key hint so
every item written under one response co-locates with the response
record (Foundry storage requirement).
Returns:
A new id of the form ``{type-prefix}_<partitionKey><entropy>``,
or ``None`` when *item* is an unrecognised / reference-only type
(mirrors the SDK helper's contract).
"""
return IdGenerator.new_item_id(item, response_id)
@@ -3,17 +3,15 @@
from __future__ import annotations
import asyncio
import base64
import json
import logging
import os
import tempfile
import threading
from collections.abc import AsyncIterable, AsyncIterator, Generator, Mapping, Sequence
from contextlib import suppress
from pathlib import Path
from collections.abc import AsyncIterable, AsyncIterator, Generator
from contextlib import AbstractAsyncContextManager, AsyncExitStack, suppress
from typing import Protocol, cast
from pathlib import Path
from typing import cast
from agent_framework import (
ChatOptions,
@@ -21,7 +19,6 @@ from agent_framework import (
ContextProvider,
FileCheckpointStorage,
HistoryProvider,
Message,
RawAgent,
SupportsAgentRun,
WorkflowAgent,
@@ -32,77 +29,10 @@ from azure.ai.agentserver.responses import (
ResponseEventStream,
ResponseProviderProtocol,
ResponsesServerOptions,
models,
)
from azure.ai.agentserver.responses._id_generator import IdGenerator
from azure.ai.agentserver.responses.hosting import ResponsesAgentServerHost
from azure.ai.agentserver.responses.models import (
ApplyPatchToolCallItemParam,
ApplyPatchToolCallOutputItemParam,
ComputerCallOutputItemParam,
ComputerScreenshotContent,
CreateResponse,
FunctionCallOutputItemParam,
FunctionShellAction,
FunctionShellCallItemParam,
FunctionShellCallOutputContent,
FunctionShellCallOutputExitOutcome,
FunctionShellCallOutputItemParam,
Item,
ItemCodeInterpreterToolCall,
ItemComputerToolCall,
ItemCustomToolCall,
ItemCustomToolCallOutput,
ItemFileSearchToolCall,
ItemFunctionToolCall,
ItemImageGenToolCall,
ItemLocalShellToolCall,
ItemLocalShellToolCallOutput,
ItemMcpApprovalRequest,
ItemMcpToolCall,
ItemMessage,
ItemOutputMessage,
ItemReasoningItem,
ItemWebSearchToolCall,
LocalEnvironmentResource,
MCPApprovalResponse,
MessageContent,
MessageContentInputFileContent,
MessageContentInputImageContent,
MessageContentInputTextContent,
MessageContentOutputTextContent,
MessageContentReasoningTextContent,
MessageContentRefusalContent,
OAuthConsentRequestOutputItem,
OutputItem,
OutputItemApplyPatchToolCall,
OutputItemApplyPatchToolCallOutput,
OutputItemCodeInterpreterToolCall,
OutputItemComputerToolCall,
OutputItemComputerToolCallOutputResource,
OutputItemCustomToolCall,
OutputItemCustomToolCallOutput,
OutputItemFileSearchToolCall,
OutputItemFunctionShellCall,
OutputItemFunctionShellCallOutput,
OutputItemFunctionToolCall,
OutputItemImageGenToolCall,
OutputItemLocalShellToolCall,
OutputItemLocalShellToolCallOutput,
OutputItemMcpApprovalRequest,
OutputItemMcpApprovalResponseResource,
OutputItemMcpToolCall,
OutputItemMessage,
OutputItemOutputMessage,
OutputItemReasoningItem,
OutputItemWebSearchToolCall,
OutputMessageContent,
OutputMessageContentOutputTextContent,
OutputMessageContentRefusalContent,
ResponseStreamEvent,
StructuredOutputsOutputItem,
SummaryTextContent,
TextContent,
)
from azure.ai.agentserver.responses.streaming._builders import (
OutputItemFunctionCallBuilder,
OutputItemMcpCallBuilder,
@@ -114,22 +44,45 @@ from azure.ai.agentserver.responses.streaming._builders import (
from mcp import McpError
from typing_extensions import Any
from ._shared import (
ApprovalStorage,
_arguments_to_str, # pyright: ignore[reportPrivateUsage]
_convert_message_content, # pyright: ignore[reportPrivateUsage]
_convert_output_message_content, # pyright: ignore[reportPrivateUsage]
_item_to_message, # pyright: ignore[reportPrivateUsage]
_items_to_messages, # pyright: ignore[reportPrivateUsage]
_output_item_to_message, # pyright: ignore[reportPrivateUsage]
_output_items_to_messages, # pyright: ignore[reportPrivateUsage]
)
# Re-export the conversion helpers under their historical names so existing
# tests (which import them from this module) keep working — the canonical
# definitions now live in :mod:`._shared`.
__all__ = (
"ApprovalStorage",
"_arguments_to_str",
"_convert_message_content",
"_convert_output_message_content",
"_item_to_message",
"_items_to_messages",
"_output_item_to_message",
"_output_items_to_messages",
)
# Local aliases for the agent-server SDK types this module touches at the
# Python type-annotation layer. Using ``models.X`` everywhere would work but
# would noisily clutter type-only positions where the alias adds no value.
CreateResponse = models.CreateResponse
ResponseStreamEvent = models.ResponseStreamEvent
FunctionShellAction = models.FunctionShellAction
FunctionShellCallOutputContent = models.FunctionShellCallOutputContent
FunctionShellCallOutputExitOutcome = models.FunctionShellCallOutputExitOutcome
LocalEnvironmentResource = models.LocalEnvironmentResource
OAuthConsentRequestOutputItem = models.OAuthConsentRequestOutputItem
logger = logging.getLogger(__name__)
# region Approval Storage
class ApprovalStorage(Protocol):
"""Storage for saving function approval requests."""
async def save_approval_request(self, approval_request_id: str, request: Content) -> None:
"""Save a function approval request under the given ID."""
...
async def load_approval_request(self, approval_request_id: str) -> Content:
"""Load a function approval request by its ID."""
...
class InMemoryFunctionApprovalStorage:
"""An in-memory storage for function approval requests."""
@@ -515,7 +468,7 @@ class ResponsesHostServer(ResponsesAgentServerHost):
by the hosting infrastructure or files will be preserved upon deactivation.
"""
input_items = await context.get_input_items()
input_messages = await _items_to_messages(input_items)
input_messages = await _items_to_messages(input_items, approval_storage=self._approval_storage)
is_streaming_request = request.stream is not None and request.stream is True
_, are_options_set = _to_chat_options(request)
@@ -563,9 +516,9 @@ class ResponsesHostServer(ResponsesAgentServerHost):
# conversation_id when set). When conversation_id is set, this
# matches restore_storage; when only previous_response_id was
# supplied, restore_storage points at the *prior* response's
# directory and write_storage points at the *current* response's.
# directory and checkpoint_storage points at the *current* response's.
write_context_id = context.conversation_id or context.response_id
write_storage = _checkpoint_storage_for_context(self._checkpoint_storage_path, write_context_id)
checkpoint_storage = _checkpoint_storage_for_context(self._checkpoint_storage_path, write_context_id)
# Multi-turn pattern: when we have a prior checkpoint, restore it
# first (drive the workflow back to idle with prior state intact),
@@ -584,6 +537,8 @@ class ResponsesHostServer(ResponsesAgentServerHost):
# items (carried as FunctionResult/FunctionApprovalResponse content)
# that fulfill them via :meth:`WorkflowAgent._process_pending_requests`.
if latest_checkpoint_id is not None:
if restore_storage is None: # pragma: no cover - defensive
raise RuntimeError("Checkpoint restore storage is not configured.")
if is_streaming_request:
async for _ in self._agent.run(
stream=True,
@@ -605,19 +560,19 @@ class ResponsesHostServer(ResponsesAgentServerHost):
yield response_event_stream.emit_in_progress()
if not is_streaming_request:
# Run the agent in non-streaming mode with the new user input.
response = await self._agent.run(
input_messages,
stream=False,
checkpoint_storage=write_storage,
)
# Run the agent in non-streaming mode
response = await self._agent.run(input_messages, stream=False, checkpoint_storage=checkpoint_storage)
for message in response.messages:
for content in message.contents:
async for item in _to_outputs(response_event_stream, content):
async for item in _to_outputs(
response_event_stream,
content,
approval_storage=self._approval_storage,
):
yield item
await self._delete_not_latest_checkpoints(write_storage, self._agent.workflow.name)
await self._delete_not_latest_checkpoints(checkpoint_storage, self._agent.workflow.name)
yield response_event_stream.emit_completed()
return
@@ -625,17 +580,17 @@ class ResponsesHostServer(ResponsesAgentServerHost):
# lazily created on matching content, closed when a different type arrives.
tracker = _OutputItemTracker(response_event_stream)
# Run the workflow agent in streaming mode with the new user input.
async for update in self._agent.run(
input_messages,
stream=True,
checkpoint_storage=write_storage,
):
# Run the workflow agent in streaming mode
async for update in self._agent.run(input_messages, stream=True, checkpoint_storage=checkpoint_storage):
for content in update.contents:
for event in tracker.handle(content):
yield event
if tracker.needs_async:
async for item in _to_outputs(response_event_stream, content):
async for item in _to_outputs(
response_event_stream,
content,
approval_storage=self._approval_storage,
):
yield item
tracker.needs_async = False
@@ -643,7 +598,7 @@ class ResponsesHostServer(ResponsesAgentServerHost):
for event in tracker.close():
yield event
await self._delete_not_latest_checkpoints(write_storage, self._agent.workflow.name)
await self._delete_not_latest_checkpoints(checkpoint_storage, self._agent.workflow.name)
yield response_event_stream.emit_completed()
@staticmethod
@@ -846,681 +801,6 @@ def _to_chat_options(request: CreateResponse) -> tuple[ChatOptions, bool]:
# endregion
# region Input Message Conversion
async def _items_to_messages(
input_items: Sequence[Item], *, approval_storage: ApprovalStorage | None = None
) -> list[Message]:
"""Converts a sequence of input items to a list of Messages, one per item.
Args:
input_items: The input items to convert.
approval_storage: An optional ApprovalStorage instance used to look up
approval requests when converting MCP approval response items.
Returns:
A list of Messages, one per supported input item.
"""
messages: list[Message] = []
for item in input_items:
messages.append(await _item_to_message(item, approval_storage=approval_storage))
return messages
async def _item_to_message(item: Item, *, approval_storage: ApprovalStorage | None = None) -> Message:
"""Converts an Item to a Message.
Args:
item: The Item to convert.
approval_storage: An optional ApprovalStorage instance used to look up
approval requests when converting MCP approval response items.
Returns:
The converted Message.
Raises:
ValueError: If the Item type is not supported.
"""
if item.type == "message":
msg = cast(ItemMessage, item)
if isinstance(msg.content, str):
return Message(role=msg.role, contents=[Content.from_text(msg.content)])
return Message(role=msg.role, contents=[_convert_message_content(part) for part in msg.content])
if item.type == "output_message":
output_msg = cast(ItemOutputMessage, item)
return Message(
role=output_msg.role, contents=[_convert_output_message_content(part) for part in output_msg.content]
)
if item.type == "function_call":
fc = cast(ItemFunctionToolCall, item)
return Message(
role="assistant",
contents=[Content.from_function_call(fc.call_id, fc.name, arguments=fc.arguments)],
)
if item.type == "function_call_output":
fco = cast(FunctionCallOutputItemParam, item)
output = fco.output if isinstance(fco.output, str) else str(fco.output)
return Message(
role="tool",
contents=[Content.from_function_result(fco.call_id, result=output)],
)
if item.type == "reasoning":
reasoning = cast(ItemReasoningItem, item)
reason_contents: list[Content] = []
if reasoning.summary:
for summary in reasoning.summary:
reason_contents.append(Content.from_text(summary.text))
return Message(role="assistant", contents=reason_contents)
if item.type == "mcp_call":
mcp = cast(ItemMcpToolCall, item)
return Message(
role="assistant",
contents=[
Content.from_mcp_server_tool_call(
mcp.id,
mcp.name,
server_name=mcp.server_label,
arguments=mcp.arguments,
)
],
)
if item.type == "mcp_approval_request":
mcp_req = cast(ItemMcpApprovalRequest, item)
if approval_storage is not None:
function_approval_request_content = await approval_storage.load_approval_request(mcp_req.id)
else:
raise ValueError("ApprovalStorage is required to load approval request.")
return Message(
role="assistant",
contents=[function_approval_request_content],
)
if item.type == "mcp_approval_response":
mcp_resp = cast(MCPApprovalResponse, item)
if approval_storage is not None:
function_approval_request_content = await approval_storage.load_approval_request(
mcp_resp.approval_request_id
)
else:
raise ValueError("ApprovalStorage is required to load approval request.")
return Message(
role="user",
contents=[function_approval_request_content.to_function_approval_response(mcp_resp.approve)],
)
if item.type == "code_interpreter_call":
ci = cast(ItemCodeInterpreterToolCall, item)
return Message(
role="assistant",
contents=[Content.from_code_interpreter_tool_call(call_id=ci.id)],
)
if item.type == "image_generation_call":
ig = cast(ItemImageGenToolCall, item)
return Message(
role="assistant",
contents=[Content.from_image_generation_tool_call(image_id=ig.id)],
)
if item.type == "shell_call":
sc = cast(FunctionShellCallItemParam, item)
return Message(
role="assistant",
contents=[
Content.from_shell_tool_call(
call_id=sc.call_id,
commands=sc.action.commands,
status=str(sc.status),
)
],
)
if item.type == "shell_call_output":
sco = cast(FunctionShellCallOutputItemParam, item)
outputs = [
Content.from_shell_command_output(
stdout=out.stdout or "",
stderr=out.stderr or "",
exit_code=getattr(out.outcome, "exit_code", None) if hasattr(out, "outcome") else None,
)
for out in (sco.output or [])
]
return Message(
role="tool",
contents=[
Content.from_shell_tool_result(
call_id=sco.call_id,
outputs=outputs,
max_output_length=sco.max_output_length,
)
],
)
if item.type == "local_shell_call":
lsc = cast(ItemLocalShellToolCall, item)
commands = lsc.action.command if hasattr(lsc.action, "command") and lsc.action.command else []
return Message(
role="assistant",
contents=[
Content.from_shell_tool_call(
call_id=lsc.call_id,
commands=commands,
status=str(lsc.status),
)
],
)
if item.type == "local_shell_call_output":
lsco = cast(ItemLocalShellToolCallOutput, item)
return Message(
role="tool",
contents=[
Content.from_shell_tool_result(
call_id=lsco.id,
outputs=[Content.from_shell_command_output(stdout=lsco.output)],
)
],
)
if item.type == "file_search_call":
fs = cast(ItemFileSearchToolCall, item)
return Message(
role="assistant",
contents=[
Content.from_function_call(
fs.id,
"file_search",
arguments=json.dumps({"queries": fs.queries}),
)
],
)
if item.type == "web_search_call":
ws = cast(ItemWebSearchToolCall, item)
return Message(
role="assistant",
contents=[Content.from_function_call(ws.id, "web_search")],
)
if item.type == "computer_call":
cc = cast(ItemComputerToolCall, item)
return Message(
role="assistant",
contents=[
Content.from_function_call(
cc.call_id,
"computer_use",
arguments=str(cc.action),
)
],
)
if item.type == "computer_call_output":
cco = cast(ComputerCallOutputItemParam, item)
return Message(
role="tool",
contents=[Content.from_function_result(cco.call_id, result=str(cco.output))],
)
if item.type == "custom_tool_call":
ct = cast(ItemCustomToolCall, item)
return Message(
role="assistant",
contents=[Content.from_function_call(ct.call_id, ct.name, arguments=ct.input)],
)
if item.type == "custom_tool_call_output":
cto = cast(ItemCustomToolCallOutput, item)
output = cto.output if isinstance(cto.output, str) else str(cto.output)
# Hosted-MCP results land here because the host writes them via
# `aoutput_item_custom_tool_call_output` (see `_to_outputs` for
# `mcp_server_tool_result`). The persisted `call_id` keeps its
# `mcp_*` prefix; on read, route those back to a hosted-MCP result
# Content so the chat-client serialize layer can coalesce them
# onto a single `mcp_call` input item with `output` populated.
# Issue #5546.
if cto.call_id and cto.call_id.startswith("mcp_"):
return Message(
role="tool",
contents=[Content.from_mcp_server_tool_result(call_id=cto.call_id, output=output)],
)
return Message(
role="tool",
contents=[Content.from_function_result(cto.call_id, result=output)],
)
if item.type == "apply_patch_call":
ap = cast(ApplyPatchToolCallItemParam, item)
return Message(
role="assistant",
contents=[
Content.from_function_call(
ap.call_id,
"apply_patch",
arguments=str(ap.operation),
)
],
)
if item.type == "apply_patch_call_output":
apo = cast(ApplyPatchToolCallOutputItemParam, item)
return Message(
role="tool",
contents=[Content.from_function_result(apo.call_id, result=apo.output or "")],
)
raise ValueError(f"Unsupported Item type: {item.type}")
async def _output_items_to_messages(
history: Sequence[OutputItem],
*,
approval_storage: ApprovalStorage | None = None,
) -> list[Message]:
"""Converts a sequence of OutputItem objects to a list of Message objects.
Args:
history (Sequence[OutputItem]): The sequence of OutputItem objects to convert.
approval_storage (ApprovalStorage | None, optional): The approval storage to use for
resolving MCP approval requests. Defaults to None.
Returns:
list[Message]: The list of Message objects.
"""
messages: list[Message] = []
for item in history:
messages.append(await _output_item_to_message(item, approval_storage=approval_storage))
return messages
async def _output_item_to_message(item: OutputItem, *, approval_storage: ApprovalStorage | None = None) -> Message:
"""Converts an OutputItem to a Message.
Args:
item (OutputItem): The OutputItem to convert.
approval_storage (ApprovalStorage | None, optional): The approval storage to use for
resolving MCP approval requests. Defaults to None.
Returns:
Message: The converted Message.
Raises:
ValueError: If the OutputItem type is not supported.
"""
if item.type == "output_message":
output_msg = cast(OutputItemOutputMessage, item)
return Message(
role=output_msg.role, contents=[_convert_output_message_content(part) for part in output_msg.content]
)
if item.type == "message":
msg = cast(OutputItemMessage, item)
return Message(role=msg.role, contents=[_convert_message_content(part) for part in msg.content])
if item.type == "function_call":
fc = cast(OutputItemFunctionToolCall, item)
return Message(
role="assistant",
contents=[Content.from_function_call(fc.call_id, fc.name, arguments=fc.arguments)],
)
if item.type == "function_call_output":
fco = cast(FunctionCallOutputItemParam, item)
output = fco.output if isinstance(fco.output, str) else str(fco.output)
return Message(
role="tool",
contents=[Content.from_function_result(fco.call_id, result=output)],
)
if item.type == "reasoning":
reasoning = cast(OutputItemReasoningItem, item)
contents: list[Content] = []
if reasoning.summary:
for summary in reasoning.summary:
contents.append(Content.from_text(summary.text))
return Message(role="assistant", contents=contents)
if item.type == "mcp_call":
mcp = cast(OutputItemMcpToolCall, item)
return Message(
role="assistant",
contents=[
Content.from_mcp_server_tool_call(
mcp.id,
mcp.name,
server_name=mcp.server_label,
arguments=mcp.arguments,
)
],
)
if item.type == "mcp_approval_request":
mcp_req = cast(OutputItemMcpApprovalRequest, item)
if approval_storage is not None:
function_approval_request_content = await approval_storage.load_approval_request(mcp_req.id)
else:
raise ValueError("ApprovalStorage is required to load approval request.")
return Message(
role="assistant",
contents=[function_approval_request_content],
)
if item.type == "mcp_approval_response":
mcp_resp = cast(OutputItemMcpApprovalResponseResource, item)
if approval_storage is not None:
function_approval_request_content = await approval_storage.load_approval_request(
mcp_resp.approval_request_id
)
else:
raise ValueError("ApprovalStorage is required to load approval request.")
return Message(
role="user",
contents=[function_approval_request_content.to_function_approval_response(mcp_resp.approve)],
)
if item.type == "code_interpreter_call":
ci = cast(OutputItemCodeInterpreterToolCall, item)
return Message(
role="assistant",
contents=[Content.from_code_interpreter_tool_call(call_id=ci.id)],
)
if item.type == "image_generation_call":
ig = cast(OutputItemImageGenToolCall, item)
return Message(
role="assistant",
contents=[Content.from_image_generation_tool_call(image_id=ig.id)],
)
if item.type == "shell_call":
sc = cast(OutputItemFunctionShellCall, item)
return Message(
role="assistant",
contents=[
Content.from_shell_tool_call(
call_id=sc.call_id,
commands=sc.action.commands,
status=str(sc.status),
)
],
)
if item.type == "shell_call_output":
sco = cast(OutputItemFunctionShellCallOutput, item)
outputs = [
Content.from_shell_command_output(
stdout=out.stdout or "",
stderr=out.stderr or "",
exit_code=getattr(out.outcome, "exit_code", None) if hasattr(out, "outcome") else None,
)
for out in (sco.output or [])
]
return Message(
role="tool",
contents=[
Content.from_shell_tool_result(
call_id=sco.call_id,
outputs=outputs,
max_output_length=sco.max_output_length,
)
],
)
if item.type == "local_shell_call":
lsc = cast(OutputItemLocalShellToolCall, item)
commands = lsc.action.command if hasattr(lsc.action, "command") and lsc.action.command else []
return Message(
role="assistant",
contents=[
Content.from_shell_tool_call(
call_id=lsc.call_id,
commands=commands,
status=str(lsc.status),
)
],
)
if item.type == "local_shell_call_output":
lsco = cast(OutputItemLocalShellToolCallOutput, item)
return Message(
role="tool",
contents=[
Content.from_shell_tool_result(
call_id=lsco.id,
outputs=[Content.from_shell_command_output(stdout=lsco.output)],
)
],
)
if item.type == "file_search_call":
fs = cast(OutputItemFileSearchToolCall, item)
return Message(
role="assistant",
contents=[
Content.from_function_call(
fs.id,
"file_search",
arguments=json.dumps({"queries": fs.queries}),
)
],
)
if item.type == "web_search_call":
ws = cast(OutputItemWebSearchToolCall, item)
return Message(
role="assistant",
contents=[Content.from_function_call(ws.id, "web_search")],
)
if item.type == "computer_call":
cc = cast(OutputItemComputerToolCall, item)
return Message(
role="assistant",
contents=[
Content.from_function_call(
cc.call_id,
"computer_use",
arguments=str(cc.action),
)
],
)
if item.type == "computer_call_output":
cco = cast(OutputItemComputerToolCallOutputResource, item)
return Message(
role="tool",
contents=[Content.from_function_result(cco.call_id, result=str(cco.output))],
)
if item.type == "custom_tool_call":
ct = cast(OutputItemCustomToolCall, item)
return Message(
role="assistant",
contents=[Content.from_function_call(ct.call_id, ct.name, arguments=ct.input)],
)
if item.type == "custom_tool_call_output":
cto = cast(OutputItemCustomToolCallOutput, item)
output = cto.output if isinstance(cto.output, str) else str(cto.output)
# Hosted-MCP results land here because the host writes them via
# `aoutput_item_custom_tool_call_output`. Route `mcp_*` call_ids
# back to a hosted-MCP result Content so the chat-client serialize
# layer can coalesce onto the matching `mcp_call` input item.
# Issue #5546.
if cto.call_id and cto.call_id.startswith("mcp_"):
return Message(
role="tool",
contents=[Content.from_mcp_server_tool_result(call_id=cto.call_id, output=output)],
)
return Message(
role="tool",
contents=[Content.from_function_result(cto.call_id, result=output)],
)
if item.type == "apply_patch_call":
ap = cast(OutputItemApplyPatchToolCall, item)
return Message(
role="assistant",
contents=[
Content.from_function_call(
ap.call_id,
"apply_patch",
arguments=str(ap.operation),
)
],
)
if item.type == "apply_patch_call_output":
apo = cast(OutputItemApplyPatchToolCallOutput, item)
return Message(
role="tool",
contents=[Content.from_function_result(apo.call_id, result=apo.output or "")],
)
if item.type == "oauth_consent_request":
oauth = cast(OAuthConsentRequestOutputItem, item)
return Message(
role="assistant",
contents=[Content.from_oauth_consent_request(oauth.consent_link)],
)
if item.type == "structured_outputs":
so = cast(StructuredOutputsOutputItem, item)
text = json.dumps(so.output) if not isinstance(so.output, str) else so.output
return Message(role="assistant", contents=[Content.from_text(text)])
raise ValueError(f"Unsupported OutputItem type: {item.type}")
def _convert_output_message_content(content: OutputMessageContent) -> Content:
"""Converts an OutputMessageContent to a Content object.
Args:
content (OutputMessageContent): The OutputMessageContent to convert.
Returns:
Content: The converted Content object.
Raises:
ValueError: If the OutputMessageContent type is not supported.
"""
if content.type == "output_text":
text_content = cast(OutputMessageContentOutputTextContent, content)
return Content.from_text(text_content.text)
if content.type == "refusal":
refusal_content = cast(OutputMessageContentRefusalContent, content)
return Content.from_text(refusal_content.refusal)
raise ValueError(f"Unsupported OutputMessageContent type: {content.type}")
def _convert_file_data(data_uri: str, filename: str | None = None) -> Content:
"""Convert a file_data data URI to a Content object.
For text/* MIME types, decodes the base64 content and returns it as text.
For other types, returns a URI-based Content with the filename preserved.
"""
# Parse data URI: data:<media_type>;base64,<data>
if data_uri.startswith("data:") and ";base64," in data_uri:
header, encoded = data_uri.split(";base64,", 1)
media_type = header[len("data:") :]
if media_type.startswith("text/"):
try:
decoded_text = base64.b64decode(encoded).decode("utf-8")
except (ValueError, UnicodeDecodeError):
logger.warning(
"Failed to decode text/* file_data as UTF-8, falling through to URI passthrough.",
exc_info=True,
)
else:
prefix = f"[File: {filename}]\n" if filename else ""
return Content.from_text(f"{prefix}{decoded_text}")
additional_properties = {"filename": filename} if filename else None
return Content.from_uri(data_uri, additional_properties=additional_properties)
def _convert_message_content(content: MessageContent) -> Content:
"""Converts a MessageContent to a Content object.
Args:
content (MessageContent): The MessageContent to convert.
Returns:
Content: The converted Content object.
Raises:
ValueError: If the MessageContent type is not supported.
"""
if content.type == "input_text":
input_text = cast(MessageContentInputTextContent, content)
return Content.from_text(input_text.text)
if content.type == "output_text":
output_text = cast(MessageContentOutputTextContent, content)
return Content.from_text(output_text.text)
if content.type == "text":
text = cast(TextContent, content)
return Content.from_text(text.text)
if content.type == "summary_text":
summary = cast(SummaryTextContent, content)
return Content.from_text(summary.text)
if content.type == "refusal":
refusal = cast(MessageContentRefusalContent, content)
return Content.from_text(refusal.refusal)
if content.type == "reasoning_text":
reasoning = cast(MessageContentReasoningTextContent, content)
return Content.from_text_reasoning(text=reasoning.text)
if content.type == "input_image":
image = cast(MessageContentInputImageContent, content)
if image.image_url:
if image.image_url.startswith("data:"):
return Content.from_uri(image.image_url)
return Content.from_uri(image.image_url, media_type="image/*")
if image.file_id:
return Content.from_hosted_file(image.file_id)
if content.type == "input_file":
file = cast(MessageContentInputFileContent, content)
if file.file_url:
return Content.from_uri(file.file_url)
if file.file_id:
return Content.from_hosted_file(file.file_id, name=file.filename)
if file.file_data:
return _convert_file_data(file.file_data, file.filename)
if content.type == "computer_screenshot":
screenshot = cast(ComputerScreenshotContent, content)
return Content.from_uri(screenshot.image_url)
raise ValueError(f"Unsupported MessageContent type: {content.type}")
# endregion
# region Output Item Conversion
def _arguments_to_str(arguments: str | Mapping[str, Any] | None) -> str:
"""Convert arguments to a JSON string.
Args:
arguments: The arguments to convert, can be a string, mapping, or None.
Returns:
The arguments as a JSON string.
"""
if arguments is None:
return ""
if isinstance(arguments, str):
return arguments
return json.dumps(arguments)
async def _to_outputs(
stream: ResponseEventStream,
content: Content,
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
@@ -2892,6 +2892,8 @@ class TestCheckpointContextPathValidation:
f"before={before} after={after}"
)
assert list(root.iterdir()) == [], f"Checkpoint directory created inside root for {context_field}={bad_id!r}"
# region Agent lifecycle (lazy entry & OAuth consent surfacing)
+1 -1
View File
@@ -605,7 +605,7 @@ dependencies = [
[package.metadata]
requires-dist = [
{ name = "agent-framework-core", editable = "packages/core" },
{ name = "github-copilot-sdk", marker = "python_full_version >= '3.11'", specifier = "<=1.0.0b2,>=1.0.0b2" },
{ name = "github-copilot-sdk", marker = "python_full_version >= '3.11'", specifier = ">=1.0.0b2,<=1.0.0b2" },
]
[[package]]