Python: add agent telemetry (#283)

* add agent telemetry

* updated comments

* update AgentRunResponseUpdate

* updated create_agent var

---------

Co-authored-by: Chris <66376200+crickman@users.noreply.github.com>
This commit is contained in:
Eduard van Valkenburg
2025-07-31 12:29:38 +02:00
committed by GitHub
Unverified
parent e877ffbca1
commit caee8bfa90
6 changed files with 907 additions and 37 deletions
@@ -3,7 +3,7 @@
import sys
from collections.abc import AsyncIterable, Callable, MutableMapping, Sequence
from enum import Enum
from typing import Any, Literal, Protocol, TypeVar, runtime_checkable
from typing import Any, ClassVar, Literal, Protocol, TypeVar, runtime_checkable
from uuid import uuid4
from pydantic import BaseModel, Field
@@ -22,6 +22,7 @@ from ._types import (
ChatToolMode,
)
from .exceptions import AgentExecutionException
from .telemetry import use_agent_telemetry
if sys.version_info >= (3, 11):
from typing import Self # pragma: no cover
@@ -305,9 +306,11 @@ class ChatClientAgentThread(AgentThread):
# region ChatClientAgent
@use_agent_telemetry
class ChatClientAgent(AgentBase):
"""A Chat Client Agent."""
AGENT_SYSTEM_NAME: ClassVar[str] = "microsoft.agent_framework"
chat_client: ChatClient
instructions: str | None = None
chat_options: ChatOptions
@@ -525,7 +528,7 @@ class ChatClientAgent(AgentBase):
response_id=response.response_id,
created_at=response.created_at,
usage_details=response.usage_details,
raw_representation=response.raw_representation,
raw_representation=response,
additional_properties=response.additional_properties,
)
@@ -626,7 +629,7 @@ class ChatClientAgent(AgentBase):
message_id=update.message_id,
created_at=update.created_at,
additional_properties=update.additional_properties,
raw_representation=update.raw_representation,
raw_representation=update,
)
response = ChatResponse.from_chat_response_updates(response_updates)
+309 -24
View File
@@ -18,11 +18,20 @@ from ._pydantic import AFBaseSettings
if TYPE_CHECKING: # pragma: no cover
from opentelemetry.util._decorator import _AgnosticContextManager # type: ignore[reportPrivateUsage]
from ._agents import AgentThread, AIAgent, ChatClientAgent
from ._clients import ChatClientBase
from ._tools import AIFunction
from ._types import ChatMessage, ChatOptions, ChatResponse, ChatResponseUpdate
from ._types import (
AgentRunResponse,
AgentRunResponseUpdate,
ChatMessage,
ChatOptions,
ChatResponse,
ChatResponseUpdate,
)
TChatClientBase = TypeVar("TChatClientBase", bound="ChatClientBase")
TChatClientAgent = TypeVar("TChatClientAgent", bound="ChatClientAgent")
tracer = get_tracer("agent_framework")
logger = get_logger()
@@ -32,6 +41,7 @@ __all__ = [
"APP_INFO",
"USER_AGENT_KEY",
"prepend_agent_framework_to_user_agent",
"use_agent_telemetry",
"use_telemetry",
]
@@ -66,7 +76,8 @@ logger.addFilter(ChatMessageListTimestampFilter())
class GenAIAttributes(str, Enum):
"""Enum to capture the attributes used in OpenTelemetry for Generative AI.
Based on: https://opentelemetry.io/docs/concepts/semantic-conventions/
Based on: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/
and https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-agent-spans/
Should always be used, with `.value` to get the string representation.
"""
@@ -74,24 +85,40 @@ class GenAIAttributes(str, Enum):
OPERATION = "gen_ai.operation.name"
SYSTEM = "gen_ai.system"
ERROR_TYPE = "error.type"
PORT = "server.port"
ADDRESS = "server.address"
SPAN_ID = "SpanId"
TRACE_ID = "TraceId"
# Request attributes
MODEL = "gen_ai.request.model"
SEED = "gen_ai.request.seed"
PORT = "server.port"
ENCODING_FORMATS = "gen_ai.request.encoding_formats"
FREQUENCY_PENALTY = "gen_ai.request.frequency_penalty"
MAX_TOKENS = "gen_ai.request.max_tokens"
PRESENCE_PENALTY = "gen_ai.request.presence_penalty"
STOP_SEQUENCES = "gen_ai.request.stop_sequences"
TEMPERATURE = "gen_ai.request.temperature"
TOP_K = "gen_ai.request.top_k"
TOP_P = "gen_ai.request.top_p"
FINISH_REASON = "gen_ai.response.finish_reason"
CHOICE_COUNT = "gen_ai.request.choice.count"
# Response attributes
FINISH_REASONS = "gen_ai.response.finish_reasons"
RESPONSE_ID = "gen_ai.response.id"
RESPONSE_MODEL = "gen_ai.response.model"
# Usage attributes
INPUT_TOKENS = "gen_ai.usage.input_tokens"
OUTPUT_TOKENS = "gen_ai.usage.output_tokens"
# Tool attributes
TOOL_CALL_ID = "gen_ai.tool.call.id"
TOOL_DESCRIPTION = "gen_ai.tool.description"
TOOL_NAME = "gen_ai.tool.name"
ADDRESS = "server.address"
AGENT_ID = "gen_ai.agent.id"
# Agent attributes
AGENT_NAME = "gen_ai.agent.name"
AGENT_DESCRIPTION = "gen_ai.agent.description"
CONVERSATION_ID = "gen_ai.conversation.id"
DATA_SOURCE_ID = "gen_ai.data_source.id"
OUTPUT_TYPE = "gen_ai.output.type"
# Activity events
EVENT_NAME = "event.name"
@@ -103,12 +130,15 @@ class GenAIAttributes(str, Enum):
PROMPT = "gen_ai.prompt"
# Operation names
CHAT_COMPLETION_OPERATION = "chat.completions"
CHAT_STREAMING_COMPLETION_OPERATION = "chat.streaming_completions"
CHAT_COMPLETION_OPERATION = "chat"
TOOL_EXECUTION_OPERATION = "execute_tool"
# Describes GenAI agent creation and is usually applicable when working with remote agent services.
AGENT_CREATE_OPERATION = "create_agent"
AGENT_INVOKE_OPERATION = "invoke_agent"
# Agent Framework specific attributes
MEASUREMENT_FUNCTION_TAG_NAME = "agent_framework.function.name"
AGENT_FRAMEWORK_GEN_AI_SYSTEM = "microsoft.agent_framework"
ROLE_EVENT_MAP = {
@@ -151,6 +181,9 @@ def prepend_agent_framework_to_user_agent(headers: dict[str, Any]) -> dict[str,
return headers
# region Telemetry utils
class ModelDiagnosticSettings(AFBaseSettings):
"""Settings for model diagnostics.
@@ -228,6 +261,15 @@ def start_as_current_span(
)
def _set_error(span: Span, error: Exception) -> None:
"""Set an error for spans."""
span.set_attribute(GenAIAttributes.ERROR_TYPE.value, str(type(error)))
span.set_status(StatusCode.ERROR, repr(error))
# region ChatClient
def _trace_chat_get_response(
completion_func: Callable[..., Awaitable["ChatResponse"]],
) -> Callable[..., Awaitable["ChatResponse"]]:
@@ -270,7 +312,7 @@ def _trace_chat_get_response(
_set_chat_response_output(current_span, response, self.MODEL_PROVIDER_NAME)
return response
except Exception as exception:
_set_chat_response_error(current_span, exception)
_set_error(current_span, exception)
raise
# Mark the wrapper decorator as a chat completion decorator
@@ -306,7 +348,7 @@ def _trace_chat_get_streaming_response(
with use_span(
_get_chat_response_span(
GenAIAttributes.CHAT_STREAMING_COMPLETION_OPERATION.value,
GenAIAttributes.CHAT_COMPLETION_OPERATION.value,
getattr(self, "ai_model_id", chat_options.ai_model_id or "unknown"),
self.MODEL_PROVIDER_NAME,
self.service_url() if hasattr(self, "service_url") else None,
@@ -323,7 +365,7 @@ def _trace_chat_get_streaming_response(
all_messages_flattened = ChatResponse.from_chat_response_updates(all_updates)
_set_chat_response_output(current_span, all_messages_flattened, self.MODEL_PROVIDER_NAME)
except Exception as exception:
_set_chat_response_error(current_span, exception)
_set_error(current_span, exception)
raise
# Mark the wrapper decorator as a streaming chat completion decorator
@@ -367,6 +409,7 @@ def _get_chat_response_span(
GenAIAttributes.OPERATION.value: operation_name,
GenAIAttributes.SYSTEM.value: model_provider,
GenAIAttributes.MODEL.value: model_name,
GenAIAttributes.CHOICE_COUNT.value: 1,
})
if service_url:
@@ -384,6 +427,8 @@ def _get_chat_response_span(
span.set_attribute(GenAIAttributes.TEMPERATURE.value, chat_options.temperature)
if chat_options.top_p is not None:
span.set_attribute(GenAIAttributes.TOP_P.value, chat_options.top_p)
if chat_options.presence_penalty is not None:
span.set_attribute(GenAIAttributes.PRESENCE_PENALTY.value, chat_options.presence_penalty)
if "top_k" in chat_options.additional_properties:
span.set_attribute(GenAIAttributes.TOP_K.value, chat_options.additional_properties["top_k"])
if "encoding_formats" in chat_options.additional_properties:
@@ -404,15 +449,14 @@ def _set_chat_response_input(
if MODEL_DIAGNOSTICS_SETTINGS.SENSITIVE_EVENTS_ENABLED:
for idx, message in enumerate(messages):
event_name = ROLE_EVENT_MAP.get(message.role.value)
if event_name:
logger.info(
message.model_dump_json(exclude_none=True),
extra={
GenAIAttributes.EVENT_NAME.value: event_name,
GenAIAttributes.SYSTEM.value: model_provider,
ChatMessageListTimestampFilter.INDEX_KEY: idx,
},
)
logger.info(
message.model_dump_json(exclude_none=True),
extra={
GenAIAttributes.EVENT_NAME.value: event_name,
GenAIAttributes.SYSTEM.value: model_provider,
ChatMessageListTimestampFilter.INDEX_KEY: idx,
},
)
def _set_chat_response_output(
@@ -433,7 +477,7 @@ def _set_chat_response_output(
# Set the finish reason
finish_reason = response.finish_reason
if finish_reason:
current_span.set_attribute(GenAIAttributes.FINISH_REASON.value, finish_reason.value)
current_span.set_attribute(GenAIAttributes.FINISH_REASONS.value, [finish_reason.value])
# Set usage attributes
@@ -460,7 +504,248 @@ def _set_chat_response_output(
)
def _set_chat_response_error(span: Span, error: Exception) -> None:
"""Set an error for chat client responses."""
span.set_attribute(GenAIAttributes.ERROR_TYPE.value, str(type(error)))
span.set_status(StatusCode.ERROR, repr(error))
# region Agent
def _trace_agent_run(
run_func: Callable[..., Awaitable["AgentRunResponse"]],
) -> Callable[..., Awaitable["AgentRunResponse"]]:
"""Decorator to trace chat completion activities.
Args:
run_func: The function to trace.
"""
@functools.wraps(run_func)
async def wrap_run(
self: "ChatClientAgent",
messages: "str | ChatMessage | list[str] | list[ChatMessage] | None" = None,
*,
thread: "AgentThread | None" = None,
**kwargs: Any,
) -> "AgentRunResponse":
if not MODEL_DIAGNOSTICS_SETTINGS.ENABLED:
# If model diagnostics are not enabled, just return the completion
return await run_func(
self,
messages=messages,
thread=thread,
**kwargs,
)
with use_span(
_get_agent_run_span(
operation_name=GenAIAttributes.AGENT_INVOKE_OPERATION.value,
agent=self,
system=self.AGENT_SYSTEM_NAME,
thread=thread,
**kwargs,
),
end_on_exit=True,
) as current_span:
_set_agent_run_input(self.AGENT_SYSTEM_NAME, messages)
try:
response = await run_func(self, messages=messages, thread=thread, **kwargs)
_set_agent_run_output(current_span, response, self.AGENT_SYSTEM_NAME)
return response
except Exception as exception:
_set_error(current_span, exception)
raise
# Mark the wrapper decorator as a agent run decorator
wrap_run.__model_diagnostics_agent_run__ = True # type: ignore
return wrap_run
def _trace_agent_run_streaming(
run_func: Callable[..., AsyncIterable["AgentRunResponseUpdate"]],
) -> Callable[..., AsyncIterable["AgentRunResponseUpdate"]]:
"""Decorator to trace streaming agent run activities.
Args:
run_func: The function to trace.
"""
@functools.wraps(run_func)
async def wrap_run_streaming(
self: "ChatClientAgent",
messages: "str | ChatMessage | list[str] | list[ChatMessage] | None" = None,
*,
thread: "AgentThread | None" = None,
**kwargs: Any,
) -> AsyncIterable["AgentRunResponseUpdate"]:
if not MODEL_DIAGNOSTICS_SETTINGS.ENABLED:
# If model diagnostics are not enabled, just return the completion
async for streaming_agent_response in run_func(self, messages=messages, thread=thread, **kwargs):
yield streaming_agent_response
return
from ._types import AgentRunResponse
all_updates: list["AgentRunResponseUpdate"] = []
with use_span(
_get_agent_run_span(
operation_name=GenAIAttributes.AGENT_INVOKE_OPERATION.value,
agent=self,
system=self.AGENT_SYSTEM_NAME,
thread=thread,
**kwargs,
),
end_on_exit=True,
) as current_span:
_set_agent_run_input(self.AGENT_SYSTEM_NAME, messages)
try:
async for response in run_func(self, messages=messages, thread=thread, **kwargs):
all_updates.append(response)
yield response
all_messages_flattened = AgentRunResponse.from_agent_run_response_updates(all_updates)
_set_agent_run_output(current_span, all_messages_flattened, self.AGENT_SYSTEM_NAME)
except Exception as exception:
_set_error(current_span, exception)
raise
# Mark the wrapper decorator as a streaming agent run decorator
wrap_run_streaming.__model_diagnostics_streaming_agent_run__ = True # type: ignore
return wrap_run_streaming
def use_agent_telemetry(cls: type[TChatClientAgent]) -> type[TChatClientAgent]:
"""Class decorator that enables telemetry for an agent."""
if run := getattr(cls, "run", None):
cls.run = _trace_agent_run(run) # type: ignore
if run_streaming := getattr(cls, "run_streaming", None):
cls.run_streaming = _trace_agent_run_streaming(run_streaming) # type: ignore
return cls
def _get_agent_run_span(
*,
operation_name: str,
agent: "AIAgent",
system: str,
thread: "AgentThread | None",
**kwargs: Any,
) -> Span:
"""Start a text or chat completion span for a given model.
Note that `start_span` doesn't make the span the current span.
Use `use_span` to make it the current span as a context manager.
Should follow: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-agent-spans/#invoke-agent-span
"""
span = tracer.start_span(f"{operation_name} {agent.display_name}")
# Set attributes on the span
span.set_attributes({
GenAIAttributes.OPERATION.value: operation_name,
GenAIAttributes.SYSTEM.value: system,
GenAIAttributes.CHOICE_COUNT.value: 1,
GenAIAttributes.AGENT_ID.value: agent.id,
})
if agent.name:
span.set_attribute(GenAIAttributes.AGENT_NAME.value, agent.name)
if agent.description:
span.set_attribute(GenAIAttributes.AGENT_DESCRIPTION.value, agent.description)
if thread and thread.id:
span.set_attribute(GenAIAttributes.CONVERSATION_ID.value, thread.id)
if "model" in kwargs:
span.set_attribute(GenAIAttributes.MODEL.value, kwargs["model"])
if "seed" in kwargs:
span.set_attribute(GenAIAttributes.SEED.value, kwargs["seed"])
if "frequency_penalty" in kwargs:
span.set_attribute(GenAIAttributes.FREQUENCY_PENALTY.value, kwargs["frequency_penalty"])
if "presence_penalty" in kwargs:
span.set_attribute(GenAIAttributes.PRESENCE_PENALTY.value, kwargs["presence_penalty"])
if "max_tokens" in kwargs:
span.set_attribute(GenAIAttributes.MAX_TOKENS.value, kwargs["max_tokens"])
if "stop" in kwargs:
span.set_attribute(GenAIAttributes.STOP_SEQUENCES.value, kwargs["stop"])
if "temperature" in kwargs:
span.set_attribute(GenAIAttributes.TEMPERATURE.value, kwargs["temperature"])
if "top_p" in kwargs:
span.set_attribute(GenAIAttributes.TOP_P.value, kwargs["top_p"])
if "top_k" in kwargs:
span.set_attribute(GenAIAttributes.TOP_K.value, kwargs["top_k"])
if "encoding_formats" in kwargs:
span.set_attribute(GenAIAttributes.ENCODING_FORMATS.value, kwargs["encoding_formats"])
return span
def _set_agent_run_input(
system: str,
messages: "str | ChatMessage | list[str] | list[ChatMessage] | list[str | ChatMessage] | None" = None,
) -> None:
"""Set the input for a chat response.
The logs will be associated to the current span.
"""
if messages and MODEL_DIAGNOSTICS_SETTINGS.SENSITIVE_EVENTS_ENABLED:
if not isinstance(messages, list):
messages = [messages]
for idx, message in enumerate(messages):
if isinstance(message, str):
logger.info(
message,
extra={
# assume user message
GenAIAttributes.EVENT_NAME.value: GenAIAttributes.USER_MESSAGE.value,
GenAIAttributes.SYSTEM.value: system,
ChatMessageListTimestampFilter.INDEX_KEY: idx,
},
)
else:
logger.info(
message.model_dump_json(exclude_none=True),
extra={
GenAIAttributes.EVENT_NAME.value: ROLE_EVENT_MAP.get(message.role.value),
GenAIAttributes.SYSTEM.value: system,
ChatMessageListTimestampFilter.INDEX_KEY: idx,
},
)
def _set_agent_run_output(
current_span: Span,
response: "AgentRunResponse",
model_provider: str,
) -> None:
"""Set the agent response for a given span."""
first_completion = response.messages[0]
# Set the response ID
response_id = (
first_completion.additional_properties.get("id") if first_completion.additional_properties is not None else None
)
if response_id:
current_span.set_attribute(GenAIAttributes.RESPONSE_ID.value, response_id)
# Set the finish reason
finish_reason = getattr(response.raw_representation, "finish_reason", None) if response.raw_representation else None
if finish_reason:
current_span.set_attribute(GenAIAttributes.FINISH_REASONS.value, [finish_reason.value])
# Set usage attributes
usage = response.usage_details
if usage:
if usage.input_token_count:
current_span.set_attribute(GenAIAttributes.INPUT_TOKENS.value, usage.input_token_count)
if usage.output_token_count:
current_span.set_attribute(GenAIAttributes.OUTPUT_TOKENS.value, usage.output_token_count)
# Set the completion event
if MODEL_DIAGNOSTICS_SETTINGS.SENSITIVE_EVENTS_ENABLED:
for msg in response.messages:
full_response: dict[str, Any] = {
"message": msg.model_dump(exclude_none=True),
}
full_response["index"] = response.response_id
logger.info(
json.dumps(full_response),
extra={
GenAIAttributes.EVENT_NAME.value: GenAIAttributes.CHOICE.value,
GenAIAttributes.SYSTEM.value: model_provider,
},
)
@@ -6,6 +6,7 @@ from typing import Any
from unittest.mock import Mock, patch
import pytest
from opentelemetry.trace import StatusCode
from agent_framework import (
ChatMessage,
@@ -88,9 +89,9 @@ def test_enum_values():
assert GenAIAttributes.OPERATION.value == "gen_ai.operation.name"
assert GenAIAttributes.SYSTEM.value == "gen_ai.system"
assert GenAIAttributes.MODEL.value == "gen_ai.request.model"
assert GenAIAttributes.CHAT_COMPLETION_OPERATION.value == "chat.completions"
assert GenAIAttributes.CHAT_STREAMING_COMPLETION_OPERATION.value == "chat.streaming_completions"
assert GenAIAttributes.CHAT_COMPLETION_OPERATION.value == "chat"
assert GenAIAttributes.TOOL_EXECUTION_OPERATION.value == "execute_tool"
assert GenAIAttributes.AGENT_INVOKE_OPERATION.value == "invoke_agent"
# region Test prepend_agent_framework_to_user_agent
@@ -485,7 +486,7 @@ async def test_streaming_response_with_exception_via_decorator(mock_chat_client,
patch("agent_framework.telemetry.use_span") as mock_use_span,
patch("agent_framework.telemetry._get_chat_response_span"),
patch("agent_framework.telemetry._set_chat_response_input"),
patch("agent_framework.telemetry._set_chat_response_error") as mock_set_error,
patch("agent_framework.telemetry._set_error") as mock_set_error,
):
mock_span = Mock()
mock_use_span.return_value.__enter__.return_value = mock_span
@@ -610,3 +611,394 @@ def test_prepend_user_agent_with_none_value():
# Should handle None gracefully
assert "User-Agent" in result
assert AGENT_FRAMEWORK_USER_AGENT in str(result["User-Agent"])
# region Test use_agent_telemetry decorator
def test_agent_decorator_with_valid_class():
"""Test that agent decorator works with a valid ChatClientAgent-like class."""
from agent_framework.telemetry import use_agent_telemetry
# Create a mock class with the required methods
class MockChatClientAgent:
AGENT_SYSTEM_NAME = "test_agent_system"
def __init__(self):
self.id = "test_agent_id"
self.name = "test_agent"
self.display_name = "Test Agent"
self.description = "Test agent description"
async def run(self, messages=None, *, thread=None, **kwargs):
return Mock()
async def run_streaming(self, messages=None, *, thread=None, **kwargs):
async def gen():
yield Mock()
return gen()
# Apply the decorator
decorated_class = use_agent_telemetry(MockChatClientAgent)
# Check that the methods were wrapped
assert hasattr(decorated_class.run, "__model_diagnostics_agent_run__")
assert hasattr(decorated_class.run_streaming, "__model_diagnostics_streaming_agent_run__")
def test_agent_decorator_with_missing_methods():
"""Test that agent decorator handles classes missing required methods gracefully."""
from agent_framework.telemetry import use_agent_telemetry
class MockChatClientAgent:
AGENT_SYSTEM_NAME = "test_agent_system"
# Apply the decorator - should not raise an error
decorated_class = use_agent_telemetry(MockChatClientAgent)
# Class should be returned unchanged
assert decorated_class is MockChatClientAgent
def test_agent_decorator_with_partial_methods():
"""Test agent decorator when only one method is present."""
from agent_framework.telemetry import use_agent_telemetry
class MockChatClientAgent:
AGENT_SYSTEM_NAME = "test_agent_system"
def __init__(self):
self.id = "test_agent_id"
self.name = "test_agent"
self.display_name = "Test Agent"
async def run(self, messages=None, *, thread=None, **kwargs):
return Mock()
decorated_class = use_agent_telemetry(MockChatClientAgent)
# Only the present method should be wrapped
assert hasattr(decorated_class.run, "__model_diagnostics_agent_run__")
assert not hasattr(decorated_class, "run_streaming")
# region Test agent telemetry decorator with mock agent
@pytest.fixture
def mock_chat_client_agent():
"""Create a mock chat client agent for testing."""
from agent_framework import AgentRunResponse, ChatMessage, ChatRole, UsageDetails
class MockChatClientAgent:
AGENT_SYSTEM_NAME = "test_agent_system"
def __init__(self):
self.id = "test_agent_id"
self.name = "test_agent"
self.display_name = "Test Agent"
self.description = "Test agent description"
async def run(self, messages=None, *, thread=None, **kwargs):
return AgentRunResponse(
messages=[ChatMessage(role=ChatRole.ASSISTANT, text="Agent response")],
usage_details=UsageDetails(input_token_count=15, output_token_count=25),
response_id="test_response_id",
raw_representation=Mock(finish_reason=Mock(value="stop")),
)
async def run_streaming(self, messages=None, *, thread=None, **kwargs):
from agent_framework import AgentRunResponseUpdate
yield AgentRunResponseUpdate(text="Hello", role=ChatRole.ASSISTANT)
yield AgentRunResponseUpdate(text=" from agent", role=ChatRole.ASSISTANT)
return MockChatClientAgent()
@pytest.mark.parametrize("model_diagnostic_settings", [(False, False)], indirect=True)
async def test_agent_telemetry_disabled_bypasses_instrumentation(mock_chat_client_agent, model_diagnostic_settings):
"""Test that when agent diagnostics are disabled, telemetry is bypassed."""
from agent_framework.telemetry import use_agent_telemetry
decorated_class = use_agent_telemetry(type(mock_chat_client_agent))
agent = decorated_class()
with (
patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings),
patch("agent_framework.telemetry.use_span") as mock_use_span,
):
# This should not create any spans
response = await agent.run("Test message")
assert response is not None
mock_use_span.assert_not_called()
@pytest.mark.parametrize("model_diagnostic_settings", [(True, True)], indirect=True)
async def test_agent_instrumentation_enabled(mock_chat_client_agent, model_diagnostic_settings):
"""Test that when agent diagnostics are enabled, telemetry is applied."""
from agent_framework.telemetry import use_agent_telemetry
decorated_class = use_agent_telemetry(type(mock_chat_client_agent))
agent = decorated_class()
with (
patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings),
patch("agent_framework.telemetry.use_span") as mock_use_span,
patch("agent_framework.telemetry.logger") as mock_logger,
):
response = await agent.run("Test message")
assert response is not None
mock_use_span.assert_called_once()
# Check that logger.info was called (telemetry logs input/output)
assert mock_logger.info.call_count == 2
@pytest.mark.parametrize("model_diagnostic_settings", [(True, False)], indirect=True)
async def test_agent_streaming_response_with_diagnostics_enabled_via_decorator(
mock_chat_client_agent, model_diagnostic_settings
):
"""Test agent streaming telemetry through the use_agent_telemetry decorator."""
from agent_framework.telemetry import use_agent_telemetry
decorated_class = use_agent_telemetry(type(mock_chat_client_agent))
agent = decorated_class()
with (
patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings),
patch("agent_framework.telemetry.use_span") as mock_use_span,
patch("agent_framework.telemetry._get_agent_run_span") as mock_get_span,
patch("agent_framework.telemetry._set_agent_run_input") as mock_set_input,
patch("agent_framework.telemetry._set_agent_run_output") as mock_set_output,
):
mock_span = Mock()
mock_use_span.return_value.__enter__.return_value = mock_span
mock_use_span.return_value.__exit__.return_value = None
# Collect all yielded updates
updates = []
async for update in agent.run_streaming("Test message"):
updates.append(update)
# Verify we got the expected updates
assert len(updates) == 2
# Verify telemetry calls were made
mock_get_span.assert_called_once()
mock_set_input.assert_called_once_with("test_agent_system", "Test message")
mock_set_output.assert_called_once()
@pytest.mark.parametrize("model_diagnostic_settings", [(True, False)], indirect=True)
async def test_agent_streaming_response_with_exception_via_decorator(mock_chat_client_agent, model_diagnostic_settings):
"""Test agent streaming telemetry exception handling through decorator."""
from agent_framework.telemetry import use_agent_telemetry
async def run_streaming(self, messages=None, *, thread=None, **kwargs):
from agent_framework import AgentRunResponseUpdate, ChatRole
yield AgentRunResponseUpdate(text="Partial", role=ChatRole.ASSISTANT)
raise ValueError("Test agent streaming error")
type(mock_chat_client_agent).run_streaming = run_streaming
decorated_class = use_agent_telemetry(type(mock_chat_client_agent))
agent = decorated_class()
with (
patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings),
patch("agent_framework.telemetry.use_span") as mock_use_span,
patch("agent_framework.telemetry._get_agent_run_span"),
patch("agent_framework.telemetry._set_agent_run_input"),
patch("agent_framework.telemetry._set_error") as mock_set_error,
):
mock_span = Mock()
mock_use_span.return_value.__enter__.return_value = mock_span
mock_use_span.return_value.__exit__.return_value = None
# Should raise the exception and call error handler
with pytest.raises(ValueError, match="Test agent streaming error"):
async for _ in agent.run_streaming("Test message"):
pass
# Verify error was recorded
mock_set_error.assert_called_once()
assert isinstance(mock_set_error.call_args[0][1], ValueError)
@pytest.mark.parametrize("model_diagnostic_settings", [(False, False)], indirect=True)
async def test_agent_streaming_response_diagnostics_disabled_via_decorator(model_diagnostic_settings):
"""Test agent streaming response when diagnostics are disabled."""
from agent_framework import AgentRunResponseUpdate, ChatRole
from agent_framework.telemetry import use_agent_telemetry
class MockStreamingAgentNoDiagnostics:
AGENT_SYSTEM_NAME = "test_agent_system"
def __init__(self):
self.id = "test_agent_id"
self.name = "test_agent"
self.display_name = "Test Agent"
async def run_streaming(self, messages=None, *, thread=None, **kwargs):
yield AgentRunResponseUpdate(text="Test", role=ChatRole.ASSISTANT)
decorated_class = use_agent_telemetry(MockStreamingAgentNoDiagnostics)
agent = decorated_class()
with (
patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings),
patch("agent_framework.telemetry._get_agent_run_span") as mock_get_span,
):
# Should not create spans when diagnostics are disabled
updates = []
async for update in agent.run_streaming("Test message"):
updates.append(update)
assert len(updates) == 1
# Should not have called telemetry functions
mock_get_span.assert_not_called()
@pytest.mark.parametrize("model_diagnostic_settings", [(True, False)], indirect=True)
async def test_agent_empty_streaming_response_via_decorator(model_diagnostic_settings):
"""Test agent streaming wrapper with empty response."""
from agent_framework.telemetry import use_agent_telemetry
class MockEmptyStreamingAgent:
AGENT_SYSTEM_NAME = "test_agent_system"
def __init__(self):
self.id = "test_agent_id"
self.name = "test_agent"
self.display_name = "Test Agent"
async def run_streaming(self, messages=None, *, thread=None, **kwargs):
# Return empty stream
return
yield # This will never be reached
decorated_class = use_agent_telemetry(MockEmptyStreamingAgent)
agent = decorated_class()
with (
patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings),
patch("agent_framework.telemetry.use_span") as mock_use_span,
patch("agent_framework.telemetry._get_agent_run_span"),
patch("agent_framework.telemetry._set_agent_run_input"),
patch("agent_framework.telemetry._set_agent_run_output") as mock_set_output,
):
mock_span = Mock()
mock_use_span.return_value.__enter__.return_value = mock_span
mock_use_span.return_value.__exit__.return_value = None
# Should handle empty stream gracefully
updates = []
async for update in agent.run_streaming("Test message"):
updates.append(update)
assert len(updates) == 0
# Should still call telemetry
mock_set_output.assert_called_once()
@pytest.mark.parametrize("model_diagnostic_settings", [(True, True)], indirect=True)
async def test_agent_run_with_thread_and_kwargs(mock_chat_client_agent, model_diagnostic_settings):
"""Test agent run with thread and additional kwargs."""
from agent_framework.telemetry import use_agent_telemetry
decorated_class = use_agent_telemetry(type(mock_chat_client_agent))
agent = decorated_class()
# Mock thread
mock_thread = Mock()
mock_thread.id = "test_thread_id"
with (
patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings),
patch("agent_framework.telemetry.use_span") as mock_use_span,
patch("agent_framework.telemetry._get_agent_run_span") as mock_get_span,
):
mock_span = Mock()
mock_use_span.return_value.__enter__.return_value = mock_span
mock_use_span.return_value.__exit__.return_value = None
# Test with thread and additional kwargs
response = await agent.run(
"Test message", thread=mock_thread, temperature=0.7, max_tokens=100, model="test-model"
)
assert response is not None
# Verify the span was created with the correct parameters
mock_get_span.assert_called_once()
call_kwargs = mock_get_span.call_args[1]
assert call_kwargs["agent"] == agent
assert call_kwargs["thread"] == mock_thread
assert call_kwargs["temperature"] == 0.7
assert call_kwargs["max_tokens"] == 100
assert call_kwargs["model"] == "test-model"
@pytest.mark.parametrize("model_diagnostic_settings", [(True, False)], indirect=True)
async def test_agent_run_with_list_messages(mock_chat_client_agent, model_diagnostic_settings):
"""Test agent run with list of messages."""
from agent_framework import ChatMessage, ChatRole
from agent_framework.telemetry import use_agent_telemetry
decorated_class = use_agent_telemetry(type(mock_chat_client_agent))
agent = decorated_class()
messages = [
ChatMessage(role=ChatRole.USER, text="First message"),
ChatMessage(role=ChatRole.ASSISTANT, text="Response"),
ChatMessage(role=ChatRole.USER, text="Second message"),
]
with (
patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings),
patch("agent_framework.telemetry.use_span") as mock_use_span,
patch("agent_framework.telemetry._set_agent_run_input") as mock_set_input,
):
mock_span = Mock()
mock_use_span.return_value.__enter__.return_value = mock_span
mock_use_span.return_value.__exit__.return_value = None
response = await agent.run(messages)
assert response is not None
# Verify input was set with the list of messages
mock_set_input.assert_called_once_with("test_agent_system", messages)
@pytest.mark.parametrize("model_diagnostic_settings", [(True, False)], indirect=True)
async def test_agent_run_with_exception_handling(mock_chat_client_agent, model_diagnostic_settings):
"""Test agent run with exception handling."""
from agent_framework.telemetry import use_agent_telemetry
async def run_with_error(self, messages=None, *, thread=None, **kwargs):
raise RuntimeError("Agent run error")
type(mock_chat_client_agent).run = run_with_error
decorated_class = use_agent_telemetry(type(mock_chat_client_agent))
agent = decorated_class()
with (
patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings),
patch("agent_framework.telemetry.use_span") as mock_use_span,
):
mock_span = Mock()
mock_use_span.return_value.__enter__.return_value = mock_span
mock_use_span.return_value.__exit__.return_value = None
# Should raise the exception and call error handler
with pytest.raises(RuntimeError, match="Agent run error"):
await agent.run("Test message")
# Verify error was recorded
# Check that both error attributes were set on the span
mock_span.set_attribute.assert_called_once_with(
GenAIAttributes.ERROR_TYPE.value, str(type(RuntimeError("Agent run error")))
)
mock_span.set_status.assert_called_once_with(StatusCode.ERROR, repr(RuntimeError("Agent run error")))
@@ -0,0 +1,190 @@
# Copyright (c) Microsoft. All rights reserved.
# type: ignore
import asyncio
import logging
from random import randint
from typing import Annotated
from agent_framework import ChatClientAgent
from agent_framework.openai import OpenAIChatClient
from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import trace
from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.view import DropAggregation, View
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.semconv.attributes import service_attributes
from opentelemetry.trace import SpanKind, set_tracer_provider
from pydantic import Field
from pydantic_settings import BaseSettings
class TelemetrySampleSettings(BaseSettings):
"""Settings for the telemetry sample application.
Optional settings are:
- connection_string: str - The connection string for the Application Insights resource.
This value can be found in the Overview section when examining
your resource from the Azure portal.
(Env var CONNECTION_STRING)
- otlp_endpoint: str - The OTLP endpoint to send telemetry data to.
Depending on the exporter used, you may find this value in different places.
(Env var OTLP_ENDPOINT)
If no connection string or OTLP endpoint is provided, the telemetry data will be
exported to the console.
"""
connection_string: str | None = None
otlp_endpoint: str | None = None
# Load settings
settings = TelemetrySampleSettings()
# Create a resource to represent the service/sample
resource = Resource.create({service_attributes.SERVICE_NAME: "TelemetryExample"})
# Define the scenarios that can be run
SCENARIOS = ["ai_service", "kernel_function", "auto_function_invocation", "all"]
if settings.connection_string:
configure_azure_monitor(
connection_string=settings.connection_string, enable_live_metrics=True, logger_name="agent_framework"
)
def set_up_logging():
class LogFilter(logging.Filter):
"""A filter to not process records from several subpackages."""
# These are the namespaces that we want to exclude from logging for the purposes of this demo.
namespaces_to_exclude: list[str] = [
"httpx",
"openai",
]
def filter(self, record):
return not any([record.name.startswith(namespace) for namespace in self.namespaces_to_exclude])
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPLogExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleLogExporter())
# Create and set a global logger provider for the application.
logger_provider = LoggerProvider(resource=resource)
# Log processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
for log_exporter in exporters:
logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
# Sets the global default logger provider
set_logger_provider(logger_provider)
# Create a logging handler to write logging records, in OTLP format, to the exporter.
handler = LoggingHandler()
handler.addFilter(LogFilter())
# Attach the handler to the root logger. `getLogger()` with no arguments returns the root logger.
# Events from all child loggers will be processed by this handler.
logger = logging.getLogger()
logger.addHandler(handler)
# Set the logging level to INFO.
logger.setLevel(logging.INFO)
def set_up_tracing():
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPSpanExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleSpanExporter())
# Initialize a trace provider for the application. This is a factory for creating tracers.
tracer_provider = TracerProvider(resource=resource)
# Span processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
for exporter in exporters:
tracer_provider.add_span_processor(BatchSpanProcessor(exporter))
# Sets the global default tracer provider
set_tracer_provider(tracer_provider)
def set_up_metrics():
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPMetricExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleMetricExporter())
# Initialize a metric provider for the application. This is a factory for creating meters.
metric_readers = [
PeriodicExportingMetricReader(metric_exporter, export_interval_millis=5000) for metric_exporter in exporters
]
meter_provider = MeterProvider(
metric_readers=metric_readers,
resource=resource,
views=[
# Dropping all instrument names except for those starting with "agent_framework"
View(instrument_name="*", aggregation=DropAggregation()),
View(instrument_name="agent_framework*"),
],
)
# Sets the global default meter provider
set_meter_provider(meter_provider)
async def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
await asyncio.sleep(randint(0, 10) / 10.0) # Simulate a network call
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def main():
# Set up the providers
# This must be done before any other telemetry calls
set_up_logging()
set_up_tracing()
set_up_metrics()
tracer = trace.get_tracer("agent_framework")
with tracer.start_as_current_span("Scenario: Agent Chat", kind=SpanKind.CLIENT) as current_span:
print("Running scenario: Agent Chat")
print("Welcome to the chat, type 'exit' to quit.")
agent = ChatClientAgent(
chat_client=OpenAIChatClient(),
tools=get_weather,
name="WeatherAgent",
instructions="You are a weather assistant.",
)
thread = agent.get_new_thread()
message = input("User: ")
try:
while message.lower() != "exit":
print(f"{agent.display_name}: ", end="")
async for update in agent.run_streaming(
message,
thread=thread,
):
if update.text:
print(update.text, end="")
message = input("\nUser: ")
except Exception as e:
current_span.record_exception(e)
print(f"\nError running interactive chat: {e}")
if __name__ == "__main__":
asyncio.run(main())
@@ -23,7 +23,7 @@ from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.semconv.attributes import service_attributes
from opentelemetry.trace import set_tracer_provider
from opentelemetry.trace import SpanKind, set_tracer_provider
from pydantic import Field
from pydantic_settings import BaseSettings
@@ -98,7 +98,7 @@ def set_up_logging():
# Events from all child loggers will be processed by this handler.
logger = logging.getLogger()
logger.addHandler(handler)
# Set the logging level to NOTSET to allow all records to be processed by the handler.
# Set the logging level to WARNING, this will not log detailed events to the logger.
logger.setLevel(logging.WARNING)
@@ -159,7 +159,7 @@ async def main():
set_up_metrics()
tracer = trace.get_tracer("agent_framework")
with tracer.start_as_current_span("Scenario: Interactive Chat") as current_span:
with tracer.start_as_current_span("Scenario: Interactive Chat", kind=SpanKind.CLIENT) as current_span:
print("Running scenario: Interactive Chat")
print("Welcome to the chat, type 'exit' to quit.")
client = OpenAIChatClient()
@@ -24,7 +24,7 @@ from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.semconv.attributes import service_attributes
from opentelemetry.trace import set_tracer_provider
from opentelemetry.trace import SpanKind, set_tracer_provider
from opentelemetry.trace.span import format_trace_id
from pydantic import Field
from pydantic_settings import BaseSettings
@@ -170,7 +170,7 @@ async def run_chat_client(stream: bool = False) -> None:
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(
"Scenario: Chat Client Stream" if stream else "Scenario: Chat Client"
"Scenario: Chat Client Stream" if stream else "Scenario: Chat Client", kind=SpanKind.CLIENT
) as current_span:
print("Running scenario: Chat Client" if not stream else "Running scenario: Chat Client Stream")
try:
@@ -203,7 +203,7 @@ async def run_ai_function() -> None:
"""
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("Scenario: AI Function") as current_span:
with tracer.start_as_current_span("Scenario: AI Function", kind=SpanKind.CLIENT) as current_span:
print("Running scenario: AI Function")
try:
func = ai_function(get_weather)
@@ -222,7 +222,7 @@ async def main(scenario: Literal["chat_client", "chat_client_stream", "ai_functi
set_up_metrics()
tracer = trace.get_tracer("agent_framework")
with tracer.start_as_current_span("Scenario's") as current_span:
with tracer.start_as_current_span("Scenario's", kind=SpanKind.CLIENT) as current_span:
print(f"Trace ID: {format_trace_id(current_span.get_span_context().trace_id)}")
# Scenarios where telemetry is collected in the SDK, from the most basic to the most complex.