Fix hosting user agent missing

This commit is contained in:
Tao Chen
2026-04-23 13:49:27 -07:00
Unverified
parent 69adf6d97e
commit 03f7dc86d3
4 changed files with 190 additions and 85 deletions
@@ -4,9 +4,6 @@ from __future__ import annotations
import logging
import os
from collections.abc import Generator
from contextlib import contextmanager
from contextvars import ContextVar
from typing import Any, Final
from . import __version__ as version_info
@@ -29,34 +26,60 @@ USER_AGENT_KEY: Final[str] = "User-Agent"
HTTP_USER_AGENT: Final[str] = "agent-framework-python"
AGENT_FRAMEWORK_USER_AGENT = f"{HTTP_USER_AGENT}/{version_info}" # type: ignore[has-type]
_user_agent_prefixes: ContextVar[tuple[str, ...]] = ContextVar("_user_agent_prefixes", default=())
# This environment variable is reserved by the Foundry hosting environment to
# indicate that the agent is running in a hosted environment.
_FOUNDRY_HOSTING_ENV_VAR = "FOUNDRY_HOSTING_ENVIRONMENT"
# This prefix is added to the user agent string when the agent is running in a hosted environment.
_HOSTED_USER_AGENT_PREFIX = "foundry-hosting"
_user_agent_prefixes: set[str] = set()
@contextmanager
def user_agent_prefix(prefix: str) -> Generator[None]:
"""Context manager that adds a prefix to the user agent string for the current scope.
def _add_user_agent_prefix(prefix: str) -> None:
"""Permanently add a prefix to the user agent string.
This is useful for upstream layers that want to identify themselves in telemetry
for the duration of a request without permanently mutating global state.
This is used by hosting layers to identify themselves in telemetry.
Once added, the prefix applies to all subsequent user agent strings.
Args:
prefix: The prefix to add (e.g. "foundry-hosting").
"""
current = _user_agent_prefixes.get()
token = _user_agent_prefixes.set((*current, prefix)) if prefix and prefix not in current else None
if prefix:
_user_agent_prefixes.add(prefix)
def _detect_hosted_environment() -> None:
"""Detect if running in a hosted environment and add the user agent prefix.
Checks the ``FOUNDRY_HOSTING_ENVIRONMENT`` env var first, then falls back
to importing ``AgentConfig`` from the agent server SDK as a second layer.
"""
env_value = os.environ.get(_FOUNDRY_HOSTING_ENV_VAR)
if env_value is not None:
# Env var exists — trust its value and skip the fallback.
if env_value:
_add_user_agent_prefix(_HOSTED_USER_AGENT_PREFIX)
return
# Env var not set — fall back to AgentConfig as a second layer of defense.
try:
yield
finally:
if token is not None:
_user_agent_prefixes.reset(token)
from azure.ai.agentserver.core import AgentConfig # pyright: ignore[reportMissingImports]
if AgentConfig.from_env().is_hosted:
_add_user_agent_prefix(_HOSTED_USER_AGENT_PREFIX)
except (ImportError, AttributeError):
pass
# Run detection of hosted environment on import
_detect_hosted_environment()
def _get_user_agent() -> str:
"""Return the full user agent string including any context-scoped prefixes."""
prefixes = _user_agent_prefixes.get()
if not prefixes:
"""Return the full user agent string including any registered prefixes."""
if not _user_agent_prefixes:
return AGENT_FRAMEWORK_USER_AGENT
return f"{'/'.join(prefixes)}/{AGENT_FRAMEWORK_USER_AGENT}"
return f"{'/'.join(sorted(_user_agent_prefixes))}/{AGENT_FRAMEWORK_USER_AGENT}"
def prepend_agent_framework_to_user_agent(headers: dict[str, Any] | None = None) -> dict[str, Any]:
+139 -41
View File
@@ -1,14 +1,21 @@
# Copyright (c) Microsoft. All rights reserved.
from unittest.mock import patch
import os
from unittest.mock import MagicMock, patch
import agent_framework._telemetry as _telemetry_mod
from agent_framework import (
AGENT_FRAMEWORK_USER_AGENT,
USER_AGENT_KEY,
USER_AGENT_TELEMETRY_DISABLED_ENV_VAR,
prepend_agent_framework_to_user_agent,
)
from agent_framework._telemetry import user_agent_prefix
from agent_framework._telemetry import (
_FOUNDRY_HOSTING_ENV_VAR,
_HOSTED_USER_AGENT_PREFIX,
_add_user_agent_prefix,
_detect_hosted_environment,
)
# region Test constants
@@ -83,7 +90,7 @@ def test_prepend_to_empty_headers():
def test_prepend_to_empty_dict():
"""Test prepending to empty headers dict."""
headers = {}
headers: dict[str, str] = {}
result = prepend_agent_framework_to_user_agent(headers)
assert "User-Agent" in result
@@ -99,54 +106,145 @@ def test_modifies_original_dict():
assert "User-Agent" in headers
# region Test user_agent_prefix context manager
# region Test _add_user_agent_prefix
def test_user_agent_prefix_adds_prefix():
"""Test that the context manager adds a prefix within its scope."""
with user_agent_prefix("test-host"):
result = prepend_agent_framework_to_user_agent()
assert result["User-Agent"].startswith("test-host/")
assert AGENT_FRAMEWORK_USER_AGENT in result["User-Agent"]
# Prefix is removed after exiting the context
def test_add_user_agent_prefix_adds_prefix():
"""Test that _add_user_agent_prefix permanently adds a prefix."""
_telemetry_mod._user_agent_prefixes.clear()
_add_user_agent_prefix("test-host")
result = prepend_agent_framework_to_user_agent()
assert result["User-Agent"] == AGENT_FRAMEWORK_USER_AGENT
assert result["User-Agent"].startswith("test-host/")
assert AGENT_FRAMEWORK_USER_AGENT in result["User-Agent"]
_telemetry_mod._user_agent_prefixes.clear()
def test_user_agent_prefix_ignores_duplicates():
"""Test that duplicate prefixes are not added within nested scopes."""
with user_agent_prefix("test-host"), user_agent_prefix("test-host"):
result = prepend_agent_framework_to_user_agent()
assert result["User-Agent"].count("test-host") == 1
def test_add_user_agent_prefix_ignores_duplicates():
"""Test that duplicate prefixes are not added."""
_telemetry_mod._user_agent_prefixes.clear()
_add_user_agent_prefix("test-host")
_add_user_agent_prefix("test-host")
result = prepend_agent_framework_to_user_agent()
assert result["User-Agent"].count("test-host") == 1
_telemetry_mod._user_agent_prefixes.clear()
def test_user_agent_prefix_ignores_empty():
def test_add_user_agent_prefix_ignores_empty():
"""Test that empty strings are not added as prefixes."""
with user_agent_prefix(""):
result = prepend_agent_framework_to_user_agent()
assert result["User-Agent"] == AGENT_FRAMEWORK_USER_AGENT
def test_user_agent_prefix_restores_on_exit():
"""Test that prefixes are fully restored after the context manager exits."""
with user_agent_prefix("test-host"):
pass
_telemetry_mod._user_agent_prefixes.clear()
_add_user_agent_prefix("")
result = prepend_agent_framework_to_user_agent()
assert result["User-Agent"] == AGENT_FRAMEWORK_USER_AGENT
_telemetry_mod._user_agent_prefixes.clear()
def test_user_agent_prefix_nesting():
"""Test that nested context managers compose prefixes correctly."""
with user_agent_prefix("outer"):
with user_agent_prefix("inner"):
result = prepend_agent_framework_to_user_agent()
assert "outer" in result["User-Agent"]
assert "inner" in result["User-Agent"]
# Inner prefix removed
result = prepend_agent_framework_to_user_agent()
assert "outer" in result["User-Agent"]
assert "inner" not in result["User-Agent"]
# Both removed
def test_add_user_agent_prefix_multiple():
"""Test that multiple prefixes compose correctly."""
_telemetry_mod._user_agent_prefixes.clear()
_add_user_agent_prefix("outer")
_add_user_agent_prefix("inner")
result = prepend_agent_framework_to_user_agent()
assert result["User-Agent"] == AGENT_FRAMEWORK_USER_AGENT
assert "outer" in result["User-Agent"]
assert "inner" in result["User-Agent"]
_telemetry_mod._user_agent_prefixes.clear()
# region Test _detect_hosted_environment
def test_detect_hosted_env_var_truthy_adds_prefix():
"""Test that a truthy FOUNDRY_HOSTING_ENVIRONMENT env var adds the prefix."""
_telemetry_mod._user_agent_prefixes.clear()
with patch.dict("os.environ", {_FOUNDRY_HOSTING_ENV_VAR: "production"}):
_detect_hosted_environment()
assert _HOSTED_USER_AGENT_PREFIX in _telemetry_mod._user_agent_prefixes
_telemetry_mod._user_agent_prefixes.clear()
def test_detect_hosted_env_var_empty_skips_prefix():
"""Test that an empty FOUNDRY_HOSTING_ENVIRONMENT env var does NOT add the prefix."""
_telemetry_mod._user_agent_prefixes.clear()
with patch.dict("os.environ", {_FOUNDRY_HOSTING_ENV_VAR: ""}):
_detect_hosted_environment()
assert _HOSTED_USER_AGENT_PREFIX not in _telemetry_mod._user_agent_prefixes
_telemetry_mod._user_agent_prefixes.clear()
def test_detect_hosted_env_var_set_skips_agent_config_fallback():
"""Test that when the env var is set, AgentConfig is never consulted even if import would fail."""
_telemetry_mod._user_agent_prefixes.clear()
import builtins
real_import = builtins.__import__
def _block_agentconfig(name: str, *args, **kwargs): # type: ignore[no-untyped-def]
if "agentserver" in name:
raise AssertionError("AgentConfig should not be imported when env var is set")
return real_import(name, *args, **kwargs)
with (
patch.dict("os.environ", {_FOUNDRY_HOSTING_ENV_VAR: "prod"}),
patch("builtins.__import__", side_effect=_block_agentconfig),
):
_detect_hosted_environment()
assert _HOSTED_USER_AGENT_PREFIX in _telemetry_mod._user_agent_prefixes
_telemetry_mod._user_agent_prefixes.clear()
def _mock_agent_config(*, is_hosted: bool) -> MagicMock:
"""Create a mock azure.ai.agentserver.core module with AgentConfig."""
mock_config = MagicMock()
mock_config.is_hosted = is_hosted
mock_module = MagicMock()
mock_module.AgentConfig.from_env.return_value = mock_config
return mock_module
def test_detect_hosted_fallback_agent_config_is_hosted():
"""Test that AgentConfig fallback adds the prefix when is_hosted is True."""
_telemetry_mod._user_agent_prefixes.clear()
env = {k: v for k, v in os.environ.items() if k != _FOUNDRY_HOSTING_ENV_VAR}
mock_module = _mock_agent_config(is_hosted=True)
with (
patch.dict("os.environ", env, clear=True),
patch.dict("sys.modules", {"azure.ai.agentserver.core": mock_module}),
):
_detect_hosted_environment()
assert _HOSTED_USER_AGENT_PREFIX in _telemetry_mod._user_agent_prefixes
_telemetry_mod._user_agent_prefixes.clear()
def test_detect_hosted_fallback_agent_config_not_hosted():
"""Test that AgentConfig fallback does NOT add the prefix when is_hosted is False."""
_telemetry_mod._user_agent_prefixes.clear()
mock_module = _mock_agent_config(is_hosted=False)
env = {k: v for k, v in os.environ.items() if k != _FOUNDRY_HOSTING_ENV_VAR}
with (
patch.dict("os.environ", env, clear=True),
patch.dict("sys.modules", {"azure.ai.agentserver.core": mock_module}),
):
_detect_hosted_environment()
assert _HOSTED_USER_AGENT_PREFIX not in _telemetry_mod._user_agent_prefixes
_telemetry_mod._user_agent_prefixes.clear()
def test_detect_hosted_fallback_import_error():
"""Test that ImportError from AgentConfig is silently handled."""
_telemetry_mod._user_agent_prefixes.clear()
env = {k: v for k, v in os.environ.items() if k != _FOUNDRY_HOSTING_ENV_VAR}
with patch.dict("os.environ", env, clear=True):
# The real import may succeed or fail depending on the environment;
# force the ImportError path by making the import raise.
import builtins
real_import = builtins.__import__
def _block_agentconfig(name: str, *args, **kwargs): # type: ignore[no-untyped-def]
if "agentserver" in name:
raise ImportError("mocked")
return real_import(name, *args, **kwargs)
with patch("builtins.__import__", side_effect=_block_agentconfig):
_detect_hosted_environment()
assert _HOSTED_USER_AGENT_PREFIX not in _telemetry_mod._user_agent_prefixes
_telemetry_mod._user_agent_prefixes.clear()
@@ -1,7 +1,6 @@
# Copyright (c) Microsoft. All rights reserved.
from agent_framework import AgentSession, BaseAgent, SupportsAgentRun
from agent_framework._telemetry import user_agent_prefix
from azure.ai.agentserver.invocations import InvocationAgentServerHost
from starlette.requests import Request
from starlette.responses import JSONResponse, Response, StreamingResponse
@@ -11,8 +10,6 @@ from typing_extensions import Any, AsyncGenerator
class InvocationsHostServer(InvocationAgentServerHost):
"""An invocations server host for an agent."""
USER_AGENT_PREFIX = "foundry-hosting"
def __init__(
self,
agent: BaseAgent,
@@ -42,11 +39,6 @@ class InvocationsHostServer(InvocationAgentServerHost):
async def _handle_invoke(self, request: Request) -> Response:
"""Invoke the agent with the given request."""
with user_agent_prefix(self.USER_AGENT_PREFIX):
return await self._handle_invoke_inner(request)
async def _handle_invoke_inner(self, request: Request) -> Response:
"""Core invoke handler logic."""
data = await request.json()
session_id: str = request.state.session_id
@@ -20,7 +20,6 @@ from agent_framework import (
SupportsAgentRun,
WorkflowAgent,
)
from agent_framework._telemetry import user_agent_prefix
from azure.ai.agentserver.responses import (
ResponseContext,
ResponseEventStream,
@@ -90,7 +89,6 @@ logger = logging.getLogger(__name__)
class ResponsesHostServer(ResponsesAgentServerHost):
"""A responses server host for an agent."""
USER_AGENT_PREFIX = "foundry-hosting"
# TODO(@taochen): Allow a different checkpoint storage that stores checkpoints externally
CHECKPOINT_STORAGE_PATH = "/.checkpoints"
@@ -150,37 +148,32 @@ class ResponsesHostServer(ResponsesAgentServerHost):
self._is_workflow_agent = True
self._agent = agent
self.response_handler(self._handler) # pyright: ignore[reportUnknownMemberType]
self.response_handler(self._handle_response) # pyright: ignore[reportUnknownMemberType]
@staticmethod
def _is_streaming_request(request: CreateResponse) -> bool:
"""Check if the request is a streaming request."""
return request.stream is not None and request.stream is True
async def _handler(
async def _handle_response(
self,
request: CreateResponse,
context: ResponseContext,
cancellation_signal: asyncio.Event,
) -> AsyncIterable[ResponseStreamEvent | dict[str, Any]]:
"""Handle the creation of a response."""
with user_agent_prefix(self.USER_AGENT_PREFIX):
async for event in self._handle_inner(request, context, cancellation_signal):
yield event
if self._is_workflow_agent:
# Workflow agents are handled differently because they require checkpoint restoration
return self._handle_workflow_agent(request, context)
async def _handle_inner(
return self._handle_regular_agent(request, context)
async def _handle_regular_agent(
self,
request: CreateResponse,
context: ResponseContext,
cancellation_signal: asyncio.Event,
) -> AsyncIterable[ResponseStreamEvent | dict[str, Any]]:
"""Core handler logic."""
if self._is_workflow_agent:
# Workflow agents are handled differently because they require checkpoint restoration
async for event in self._handle_workflow_agent(request, context, cancellation_signal):
yield event
return
"""Handle the creation of a response for a regular (non-workflow) agent."""
input_text = await context.get_input_text()
history = await context.get_history()
messages: list[str | Content | Message] = [*_to_messages(history), input_text]
@@ -243,7 +236,6 @@ class ResponsesHostServer(ResponsesAgentServerHost):
self,
request: CreateResponse,
context: ResponseContext,
cancellation_signal: asyncio.Event,
) -> AsyncIterable[ResponseStreamEvent | dict[str, Any]]:
"""Handle the creation of a response for a workflow agent.