From caee8bfa90600fb417039b01f4e9795946dc4673 Mon Sep 17 00:00:00 2001 From: Eduard van Valkenburg Date: Thu, 31 Jul 2025 12:29:38 +0200 Subject: [PATCH] Python: add agent telemetry (#283) * add agent telemetry * updated comments * update AgentRunResponseUpdate * updated create_agent var --------- Co-authored-by: Chris <66376200+crickman@users.noreply.github.com> --- .../packages/main/agent_framework/_agents.py | 9 +- .../main/agent_framework/telemetry.py | 333 +++++++++++++-- .../main/tests/main/test_telemetry.py | 398 +++++++++++++++++- .../getting_started/telemetry/agent.py | 190 +++++++++ .../getting_started/telemetry/interactive.py | 6 +- .../getting_started/telemetry/scenarios.py | 8 +- 6 files changed, 907 insertions(+), 37 deletions(-) create mode 100644 python/samples/getting_started/telemetry/agent.py diff --git a/python/packages/main/agent_framework/_agents.py b/python/packages/main/agent_framework/_agents.py index c9566bff77..abe98e48ad 100644 --- a/python/packages/main/agent_framework/_agents.py +++ b/python/packages/main/agent_framework/_agents.py @@ -3,7 +3,7 @@ import sys from collections.abc import AsyncIterable, Callable, MutableMapping, Sequence from enum import Enum -from typing import Any, Literal, Protocol, TypeVar, runtime_checkable +from typing import Any, ClassVar, Literal, Protocol, TypeVar, runtime_checkable from uuid import uuid4 from pydantic import BaseModel, Field @@ -22,6 +22,7 @@ from ._types import ( ChatToolMode, ) from .exceptions import AgentExecutionException +from .telemetry import use_agent_telemetry if sys.version_info >= (3, 11): from typing import Self # pragma: no cover @@ -305,9 +306,11 @@ class ChatClientAgentThread(AgentThread): # region ChatClientAgent +@use_agent_telemetry class ChatClientAgent(AgentBase): """A Chat Client Agent.""" + AGENT_SYSTEM_NAME: ClassVar[str] = "microsoft.agent_framework" chat_client: ChatClient instructions: str | None = None chat_options: ChatOptions @@ -525,7 +528,7 @@ class ChatClientAgent(AgentBase): response_id=response.response_id, created_at=response.created_at, usage_details=response.usage_details, - raw_representation=response.raw_representation, + raw_representation=response, additional_properties=response.additional_properties, ) @@ -626,7 +629,7 @@ class ChatClientAgent(AgentBase): message_id=update.message_id, created_at=update.created_at, additional_properties=update.additional_properties, - raw_representation=update.raw_representation, + raw_representation=update, ) response = ChatResponse.from_chat_response_updates(response_updates) diff --git a/python/packages/main/agent_framework/telemetry.py b/python/packages/main/agent_framework/telemetry.py index b09275bc15..f4b6ecf581 100644 --- a/python/packages/main/agent_framework/telemetry.py +++ b/python/packages/main/agent_framework/telemetry.py @@ -18,11 +18,20 @@ from ._pydantic import AFBaseSettings if TYPE_CHECKING: # pragma: no cover from opentelemetry.util._decorator import _AgnosticContextManager # type: ignore[reportPrivateUsage] + from ._agents import AgentThread, AIAgent, ChatClientAgent from ._clients import ChatClientBase from ._tools import AIFunction - from ._types import ChatMessage, ChatOptions, ChatResponse, ChatResponseUpdate + from ._types import ( + AgentRunResponse, + AgentRunResponseUpdate, + ChatMessage, + ChatOptions, + ChatResponse, + ChatResponseUpdate, + ) TChatClientBase = TypeVar("TChatClientBase", bound="ChatClientBase") +TChatClientAgent = TypeVar("TChatClientAgent", bound="ChatClientAgent") tracer = get_tracer("agent_framework") logger = get_logger() @@ -32,6 +41,7 @@ __all__ = [ "APP_INFO", "USER_AGENT_KEY", "prepend_agent_framework_to_user_agent", + "use_agent_telemetry", "use_telemetry", ] @@ -66,7 +76,8 @@ logger.addFilter(ChatMessageListTimestampFilter()) class GenAIAttributes(str, Enum): """Enum to capture the attributes used in OpenTelemetry for Generative AI. - Based on: https://opentelemetry.io/docs/concepts/semantic-conventions/ + Based on: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/ + and https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-agent-spans/ Should always be used, with `.value` to get the string representation. """ @@ -74,24 +85,40 @@ class GenAIAttributes(str, Enum): OPERATION = "gen_ai.operation.name" SYSTEM = "gen_ai.system" ERROR_TYPE = "error.type" + PORT = "server.port" + ADDRESS = "server.address" + SPAN_ID = "SpanId" + TRACE_ID = "TraceId" + # Request attributes MODEL = "gen_ai.request.model" SEED = "gen_ai.request.seed" - PORT = "server.port" ENCODING_FORMATS = "gen_ai.request.encoding_formats" FREQUENCY_PENALTY = "gen_ai.request.frequency_penalty" MAX_TOKENS = "gen_ai.request.max_tokens" + PRESENCE_PENALTY = "gen_ai.request.presence_penalty" STOP_SEQUENCES = "gen_ai.request.stop_sequences" TEMPERATURE = "gen_ai.request.temperature" TOP_K = "gen_ai.request.top_k" TOP_P = "gen_ai.request.top_p" - FINISH_REASON = "gen_ai.response.finish_reason" + CHOICE_COUNT = "gen_ai.request.choice.count" + # Response attributes + FINISH_REASONS = "gen_ai.response.finish_reasons" RESPONSE_ID = "gen_ai.response.id" + RESPONSE_MODEL = "gen_ai.response.model" + # Usage attributes INPUT_TOKENS = "gen_ai.usage.input_tokens" OUTPUT_TOKENS = "gen_ai.usage.output_tokens" + # Tool attributes TOOL_CALL_ID = "gen_ai.tool.call.id" TOOL_DESCRIPTION = "gen_ai.tool.description" TOOL_NAME = "gen_ai.tool.name" - ADDRESS = "server.address" + AGENT_ID = "gen_ai.agent.id" + # Agent attributes + AGENT_NAME = "gen_ai.agent.name" + AGENT_DESCRIPTION = "gen_ai.agent.description" + CONVERSATION_ID = "gen_ai.conversation.id" + DATA_SOURCE_ID = "gen_ai.data_source.id" + OUTPUT_TYPE = "gen_ai.output.type" # Activity events EVENT_NAME = "event.name" @@ -103,12 +130,15 @@ class GenAIAttributes(str, Enum): PROMPT = "gen_ai.prompt" # Operation names - CHAT_COMPLETION_OPERATION = "chat.completions" - CHAT_STREAMING_COMPLETION_OPERATION = "chat.streaming_completions" + CHAT_COMPLETION_OPERATION = "chat" TOOL_EXECUTION_OPERATION = "execute_tool" + # Describes GenAI agent creation and is usually applicable when working with remote agent services. + AGENT_CREATE_OPERATION = "create_agent" + AGENT_INVOKE_OPERATION = "invoke_agent" # Agent Framework specific attributes MEASUREMENT_FUNCTION_TAG_NAME = "agent_framework.function.name" + AGENT_FRAMEWORK_GEN_AI_SYSTEM = "microsoft.agent_framework" ROLE_EVENT_MAP = { @@ -151,6 +181,9 @@ def prepend_agent_framework_to_user_agent(headers: dict[str, Any]) -> dict[str, return headers +# region Telemetry utils + + class ModelDiagnosticSettings(AFBaseSettings): """Settings for model diagnostics. @@ -228,6 +261,15 @@ def start_as_current_span( ) +def _set_error(span: Span, error: Exception) -> None: + """Set an error for spans.""" + span.set_attribute(GenAIAttributes.ERROR_TYPE.value, str(type(error))) + span.set_status(StatusCode.ERROR, repr(error)) + + +# region ChatClient + + def _trace_chat_get_response( completion_func: Callable[..., Awaitable["ChatResponse"]], ) -> Callable[..., Awaitable["ChatResponse"]]: @@ -270,7 +312,7 @@ def _trace_chat_get_response( _set_chat_response_output(current_span, response, self.MODEL_PROVIDER_NAME) return response except Exception as exception: - _set_chat_response_error(current_span, exception) + _set_error(current_span, exception) raise # Mark the wrapper decorator as a chat completion decorator @@ -306,7 +348,7 @@ def _trace_chat_get_streaming_response( with use_span( _get_chat_response_span( - GenAIAttributes.CHAT_STREAMING_COMPLETION_OPERATION.value, + GenAIAttributes.CHAT_COMPLETION_OPERATION.value, getattr(self, "ai_model_id", chat_options.ai_model_id or "unknown"), self.MODEL_PROVIDER_NAME, self.service_url() if hasattr(self, "service_url") else None, @@ -323,7 +365,7 @@ def _trace_chat_get_streaming_response( all_messages_flattened = ChatResponse.from_chat_response_updates(all_updates) _set_chat_response_output(current_span, all_messages_flattened, self.MODEL_PROVIDER_NAME) except Exception as exception: - _set_chat_response_error(current_span, exception) + _set_error(current_span, exception) raise # Mark the wrapper decorator as a streaming chat completion decorator @@ -367,6 +409,7 @@ def _get_chat_response_span( GenAIAttributes.OPERATION.value: operation_name, GenAIAttributes.SYSTEM.value: model_provider, GenAIAttributes.MODEL.value: model_name, + GenAIAttributes.CHOICE_COUNT.value: 1, }) if service_url: @@ -384,6 +427,8 @@ def _get_chat_response_span( span.set_attribute(GenAIAttributes.TEMPERATURE.value, chat_options.temperature) if chat_options.top_p is not None: span.set_attribute(GenAIAttributes.TOP_P.value, chat_options.top_p) + if chat_options.presence_penalty is not None: + span.set_attribute(GenAIAttributes.PRESENCE_PENALTY.value, chat_options.presence_penalty) if "top_k" in chat_options.additional_properties: span.set_attribute(GenAIAttributes.TOP_K.value, chat_options.additional_properties["top_k"]) if "encoding_formats" in chat_options.additional_properties: @@ -404,15 +449,14 @@ def _set_chat_response_input( if MODEL_DIAGNOSTICS_SETTINGS.SENSITIVE_EVENTS_ENABLED: for idx, message in enumerate(messages): event_name = ROLE_EVENT_MAP.get(message.role.value) - if event_name: - logger.info( - message.model_dump_json(exclude_none=True), - extra={ - GenAIAttributes.EVENT_NAME.value: event_name, - GenAIAttributes.SYSTEM.value: model_provider, - ChatMessageListTimestampFilter.INDEX_KEY: idx, - }, - ) + logger.info( + message.model_dump_json(exclude_none=True), + extra={ + GenAIAttributes.EVENT_NAME.value: event_name, + GenAIAttributes.SYSTEM.value: model_provider, + ChatMessageListTimestampFilter.INDEX_KEY: idx, + }, + ) def _set_chat_response_output( @@ -433,7 +477,7 @@ def _set_chat_response_output( # Set the finish reason finish_reason = response.finish_reason if finish_reason: - current_span.set_attribute(GenAIAttributes.FINISH_REASON.value, finish_reason.value) + current_span.set_attribute(GenAIAttributes.FINISH_REASONS.value, [finish_reason.value]) # Set usage attributes @@ -460,7 +504,248 @@ def _set_chat_response_output( ) -def _set_chat_response_error(span: Span, error: Exception) -> None: - """Set an error for chat client responses.""" - span.set_attribute(GenAIAttributes.ERROR_TYPE.value, str(type(error))) - span.set_status(StatusCode.ERROR, repr(error)) +# region Agent + + +def _trace_agent_run( + run_func: Callable[..., Awaitable["AgentRunResponse"]], +) -> Callable[..., Awaitable["AgentRunResponse"]]: + """Decorator to trace chat completion activities. + + Args: + run_func: The function to trace. + """ + + @functools.wraps(run_func) + async def wrap_run( + self: "ChatClientAgent", + messages: "str | ChatMessage | list[str] | list[ChatMessage] | None" = None, + *, + thread: "AgentThread | None" = None, + **kwargs: Any, + ) -> "AgentRunResponse": + if not MODEL_DIAGNOSTICS_SETTINGS.ENABLED: + # If model diagnostics are not enabled, just return the completion + return await run_func( + self, + messages=messages, + thread=thread, + **kwargs, + ) + + with use_span( + _get_agent_run_span( + operation_name=GenAIAttributes.AGENT_INVOKE_OPERATION.value, + agent=self, + system=self.AGENT_SYSTEM_NAME, + thread=thread, + **kwargs, + ), + end_on_exit=True, + ) as current_span: + _set_agent_run_input(self.AGENT_SYSTEM_NAME, messages) + try: + response = await run_func(self, messages=messages, thread=thread, **kwargs) + _set_agent_run_output(current_span, response, self.AGENT_SYSTEM_NAME) + return response + except Exception as exception: + _set_error(current_span, exception) + raise + + # Mark the wrapper decorator as a agent run decorator + wrap_run.__model_diagnostics_agent_run__ = True # type: ignore + + return wrap_run + + +def _trace_agent_run_streaming( + run_func: Callable[..., AsyncIterable["AgentRunResponseUpdate"]], +) -> Callable[..., AsyncIterable["AgentRunResponseUpdate"]]: + """Decorator to trace streaming agent run activities. + + Args: + run_func: The function to trace. + """ + + @functools.wraps(run_func) + async def wrap_run_streaming( + self: "ChatClientAgent", + messages: "str | ChatMessage | list[str] | list[ChatMessage] | None" = None, + *, + thread: "AgentThread | None" = None, + **kwargs: Any, + ) -> AsyncIterable["AgentRunResponseUpdate"]: + if not MODEL_DIAGNOSTICS_SETTINGS.ENABLED: + # If model diagnostics are not enabled, just return the completion + async for streaming_agent_response in run_func(self, messages=messages, thread=thread, **kwargs): + yield streaming_agent_response + return + + from ._types import AgentRunResponse + + all_updates: list["AgentRunResponseUpdate"] = [] + + with use_span( + _get_agent_run_span( + operation_name=GenAIAttributes.AGENT_INVOKE_OPERATION.value, + agent=self, + system=self.AGENT_SYSTEM_NAME, + thread=thread, + **kwargs, + ), + end_on_exit=True, + ) as current_span: + _set_agent_run_input(self.AGENT_SYSTEM_NAME, messages) + try: + async for response in run_func(self, messages=messages, thread=thread, **kwargs): + all_updates.append(response) + yield response + + all_messages_flattened = AgentRunResponse.from_agent_run_response_updates(all_updates) + _set_agent_run_output(current_span, all_messages_flattened, self.AGENT_SYSTEM_NAME) + except Exception as exception: + _set_error(current_span, exception) + raise + + # Mark the wrapper decorator as a streaming agent run decorator + wrap_run_streaming.__model_diagnostics_streaming_agent_run__ = True # type: ignore + return wrap_run_streaming + + +def use_agent_telemetry(cls: type[TChatClientAgent]) -> type[TChatClientAgent]: + """Class decorator that enables telemetry for an agent.""" + if run := getattr(cls, "run", None): + cls.run = _trace_agent_run(run) # type: ignore + if run_streaming := getattr(cls, "run_streaming", None): + cls.run_streaming = _trace_agent_run_streaming(run_streaming) # type: ignore + return cls + + +def _get_agent_run_span( + *, + operation_name: str, + agent: "AIAgent", + system: str, + thread: "AgentThread | None", + **kwargs: Any, +) -> Span: + """Start a text or chat completion span for a given model. + + Note that `start_span` doesn't make the span the current span. + Use `use_span` to make it the current span as a context manager. + + Should follow: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-agent-spans/#invoke-agent-span + """ + span = tracer.start_span(f"{operation_name} {agent.display_name}") + + # Set attributes on the span + span.set_attributes({ + GenAIAttributes.OPERATION.value: operation_name, + GenAIAttributes.SYSTEM.value: system, + GenAIAttributes.CHOICE_COUNT.value: 1, + GenAIAttributes.AGENT_ID.value: agent.id, + }) + if agent.name: + span.set_attribute(GenAIAttributes.AGENT_NAME.value, agent.name) + if agent.description: + span.set_attribute(GenAIAttributes.AGENT_DESCRIPTION.value, agent.description) + if thread and thread.id: + span.set_attribute(GenAIAttributes.CONVERSATION_ID.value, thread.id) + if "model" in kwargs: + span.set_attribute(GenAIAttributes.MODEL.value, kwargs["model"]) + if "seed" in kwargs: + span.set_attribute(GenAIAttributes.SEED.value, kwargs["seed"]) + if "frequency_penalty" in kwargs: + span.set_attribute(GenAIAttributes.FREQUENCY_PENALTY.value, kwargs["frequency_penalty"]) + if "presence_penalty" in kwargs: + span.set_attribute(GenAIAttributes.PRESENCE_PENALTY.value, kwargs["presence_penalty"]) + if "max_tokens" in kwargs: + span.set_attribute(GenAIAttributes.MAX_TOKENS.value, kwargs["max_tokens"]) + if "stop" in kwargs: + span.set_attribute(GenAIAttributes.STOP_SEQUENCES.value, kwargs["stop"]) + if "temperature" in kwargs: + span.set_attribute(GenAIAttributes.TEMPERATURE.value, kwargs["temperature"]) + if "top_p" in kwargs: + span.set_attribute(GenAIAttributes.TOP_P.value, kwargs["top_p"]) + if "top_k" in kwargs: + span.set_attribute(GenAIAttributes.TOP_K.value, kwargs["top_k"]) + if "encoding_formats" in kwargs: + span.set_attribute(GenAIAttributes.ENCODING_FORMATS.value, kwargs["encoding_formats"]) + return span + + +def _set_agent_run_input( + system: str, + messages: "str | ChatMessage | list[str] | list[ChatMessage] | list[str | ChatMessage] | None" = None, +) -> None: + """Set the input for a chat response. + + The logs will be associated to the current span. + """ + if messages and MODEL_DIAGNOSTICS_SETTINGS.SENSITIVE_EVENTS_ENABLED: + if not isinstance(messages, list): + messages = [messages] + for idx, message in enumerate(messages): + if isinstance(message, str): + logger.info( + message, + extra={ + # assume user message + GenAIAttributes.EVENT_NAME.value: GenAIAttributes.USER_MESSAGE.value, + GenAIAttributes.SYSTEM.value: system, + ChatMessageListTimestampFilter.INDEX_KEY: idx, + }, + ) + else: + logger.info( + message.model_dump_json(exclude_none=True), + extra={ + GenAIAttributes.EVENT_NAME.value: ROLE_EVENT_MAP.get(message.role.value), + GenAIAttributes.SYSTEM.value: system, + ChatMessageListTimestampFilter.INDEX_KEY: idx, + }, + ) + + +def _set_agent_run_output( + current_span: Span, + response: "AgentRunResponse", + model_provider: str, +) -> None: + """Set the agent response for a given span.""" + first_completion = response.messages[0] + + # Set the response ID + response_id = ( + first_completion.additional_properties.get("id") if first_completion.additional_properties is not None else None + ) + if response_id: + current_span.set_attribute(GenAIAttributes.RESPONSE_ID.value, response_id) + + # Set the finish reason + finish_reason = getattr(response.raw_representation, "finish_reason", None) if response.raw_representation else None + if finish_reason: + current_span.set_attribute(GenAIAttributes.FINISH_REASONS.value, [finish_reason.value]) + + # Set usage attributes + usage = response.usage_details + if usage: + if usage.input_token_count: + current_span.set_attribute(GenAIAttributes.INPUT_TOKENS.value, usage.input_token_count) + if usage.output_token_count: + current_span.set_attribute(GenAIAttributes.OUTPUT_TOKENS.value, usage.output_token_count) + + # Set the completion event + if MODEL_DIAGNOSTICS_SETTINGS.SENSITIVE_EVENTS_ENABLED: + for msg in response.messages: + full_response: dict[str, Any] = { + "message": msg.model_dump(exclude_none=True), + } + full_response["index"] = response.response_id + logger.info( + json.dumps(full_response), + extra={ + GenAIAttributes.EVENT_NAME.value: GenAIAttributes.CHOICE.value, + GenAIAttributes.SYSTEM.value: model_provider, + }, + ) diff --git a/python/packages/main/tests/main/test_telemetry.py b/python/packages/main/tests/main/test_telemetry.py index f3064b1241..0d00c35325 100644 --- a/python/packages/main/tests/main/test_telemetry.py +++ b/python/packages/main/tests/main/test_telemetry.py @@ -6,6 +6,7 @@ from typing import Any from unittest.mock import Mock, patch import pytest +from opentelemetry.trace import StatusCode from agent_framework import ( ChatMessage, @@ -88,9 +89,9 @@ def test_enum_values(): assert GenAIAttributes.OPERATION.value == "gen_ai.operation.name" assert GenAIAttributes.SYSTEM.value == "gen_ai.system" assert GenAIAttributes.MODEL.value == "gen_ai.request.model" - assert GenAIAttributes.CHAT_COMPLETION_OPERATION.value == "chat.completions" - assert GenAIAttributes.CHAT_STREAMING_COMPLETION_OPERATION.value == "chat.streaming_completions" + assert GenAIAttributes.CHAT_COMPLETION_OPERATION.value == "chat" assert GenAIAttributes.TOOL_EXECUTION_OPERATION.value == "execute_tool" + assert GenAIAttributes.AGENT_INVOKE_OPERATION.value == "invoke_agent" # region Test prepend_agent_framework_to_user_agent @@ -485,7 +486,7 @@ async def test_streaming_response_with_exception_via_decorator(mock_chat_client, patch("agent_framework.telemetry.use_span") as mock_use_span, patch("agent_framework.telemetry._get_chat_response_span"), patch("agent_framework.telemetry._set_chat_response_input"), - patch("agent_framework.telemetry._set_chat_response_error") as mock_set_error, + patch("agent_framework.telemetry._set_error") as mock_set_error, ): mock_span = Mock() mock_use_span.return_value.__enter__.return_value = mock_span @@ -610,3 +611,394 @@ def test_prepend_user_agent_with_none_value(): # Should handle None gracefully assert "User-Agent" in result assert AGENT_FRAMEWORK_USER_AGENT in str(result["User-Agent"]) + + +# region Test use_agent_telemetry decorator + + +def test_agent_decorator_with_valid_class(): + """Test that agent decorator works with a valid ChatClientAgent-like class.""" + from agent_framework.telemetry import use_agent_telemetry + + # Create a mock class with the required methods + class MockChatClientAgent: + AGENT_SYSTEM_NAME = "test_agent_system" + + def __init__(self): + self.id = "test_agent_id" + self.name = "test_agent" + self.display_name = "Test Agent" + self.description = "Test agent description" + + async def run(self, messages=None, *, thread=None, **kwargs): + return Mock() + + async def run_streaming(self, messages=None, *, thread=None, **kwargs): + async def gen(): + yield Mock() + + return gen() + + # Apply the decorator + decorated_class = use_agent_telemetry(MockChatClientAgent) + + # Check that the methods were wrapped + assert hasattr(decorated_class.run, "__model_diagnostics_agent_run__") + assert hasattr(decorated_class.run_streaming, "__model_diagnostics_streaming_agent_run__") + + +def test_agent_decorator_with_missing_methods(): + """Test that agent decorator handles classes missing required methods gracefully.""" + from agent_framework.telemetry import use_agent_telemetry + + class MockChatClientAgent: + AGENT_SYSTEM_NAME = "test_agent_system" + + # Apply the decorator - should not raise an error + decorated_class = use_agent_telemetry(MockChatClientAgent) + + # Class should be returned unchanged + assert decorated_class is MockChatClientAgent + + +def test_agent_decorator_with_partial_methods(): + """Test agent decorator when only one method is present.""" + from agent_framework.telemetry import use_agent_telemetry + + class MockChatClientAgent: + AGENT_SYSTEM_NAME = "test_agent_system" + + def __init__(self): + self.id = "test_agent_id" + self.name = "test_agent" + self.display_name = "Test Agent" + + async def run(self, messages=None, *, thread=None, **kwargs): + return Mock() + + decorated_class = use_agent_telemetry(MockChatClientAgent) + + # Only the present method should be wrapped + assert hasattr(decorated_class.run, "__model_diagnostics_agent_run__") + assert not hasattr(decorated_class, "run_streaming") + + +# region Test agent telemetry decorator with mock agent + + +@pytest.fixture +def mock_chat_client_agent(): + """Create a mock chat client agent for testing.""" + from agent_framework import AgentRunResponse, ChatMessage, ChatRole, UsageDetails + + class MockChatClientAgent: + AGENT_SYSTEM_NAME = "test_agent_system" + + def __init__(self): + self.id = "test_agent_id" + self.name = "test_agent" + self.display_name = "Test Agent" + self.description = "Test agent description" + + async def run(self, messages=None, *, thread=None, **kwargs): + return AgentRunResponse( + messages=[ChatMessage(role=ChatRole.ASSISTANT, text="Agent response")], + usage_details=UsageDetails(input_token_count=15, output_token_count=25), + response_id="test_response_id", + raw_representation=Mock(finish_reason=Mock(value="stop")), + ) + + async def run_streaming(self, messages=None, *, thread=None, **kwargs): + from agent_framework import AgentRunResponseUpdate + + yield AgentRunResponseUpdate(text="Hello", role=ChatRole.ASSISTANT) + yield AgentRunResponseUpdate(text=" from agent", role=ChatRole.ASSISTANT) + + return MockChatClientAgent() + + +@pytest.mark.parametrize("model_diagnostic_settings", [(False, False)], indirect=True) +async def test_agent_telemetry_disabled_bypasses_instrumentation(mock_chat_client_agent, model_diagnostic_settings): + """Test that when agent diagnostics are disabled, telemetry is bypassed.""" + from agent_framework.telemetry import use_agent_telemetry + + decorated_class = use_agent_telemetry(type(mock_chat_client_agent)) + agent = decorated_class() + + with ( + patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings), + patch("agent_framework.telemetry.use_span") as mock_use_span, + ): + # This should not create any spans + response = await agent.run("Test message") + assert response is not None + mock_use_span.assert_not_called() + + +@pytest.mark.parametrize("model_diagnostic_settings", [(True, True)], indirect=True) +async def test_agent_instrumentation_enabled(mock_chat_client_agent, model_diagnostic_settings): + """Test that when agent diagnostics are enabled, telemetry is applied.""" + from agent_framework.telemetry import use_agent_telemetry + + decorated_class = use_agent_telemetry(type(mock_chat_client_agent)) + agent = decorated_class() + + with ( + patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings), + patch("agent_framework.telemetry.use_span") as mock_use_span, + patch("agent_framework.telemetry.logger") as mock_logger, + ): + response = await agent.run("Test message") + assert response is not None + mock_use_span.assert_called_once() + # Check that logger.info was called (telemetry logs input/output) + assert mock_logger.info.call_count == 2 + + +@pytest.mark.parametrize("model_diagnostic_settings", [(True, False)], indirect=True) +async def test_agent_streaming_response_with_diagnostics_enabled_via_decorator( + mock_chat_client_agent, model_diagnostic_settings +): + """Test agent streaming telemetry through the use_agent_telemetry decorator.""" + from agent_framework.telemetry import use_agent_telemetry + + decorated_class = use_agent_telemetry(type(mock_chat_client_agent)) + agent = decorated_class() + + with ( + patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings), + patch("agent_framework.telemetry.use_span") as mock_use_span, + patch("agent_framework.telemetry._get_agent_run_span") as mock_get_span, + patch("agent_framework.telemetry._set_agent_run_input") as mock_set_input, + patch("agent_framework.telemetry._set_agent_run_output") as mock_set_output, + ): + mock_span = Mock() + mock_use_span.return_value.__enter__.return_value = mock_span + mock_use_span.return_value.__exit__.return_value = None + + # Collect all yielded updates + updates = [] + async for update in agent.run_streaming("Test message"): + updates.append(update) + + # Verify we got the expected updates + assert len(updates) == 2 + + # Verify telemetry calls were made + mock_get_span.assert_called_once() + mock_set_input.assert_called_once_with("test_agent_system", "Test message") + mock_set_output.assert_called_once() + + +@pytest.mark.parametrize("model_diagnostic_settings", [(True, False)], indirect=True) +async def test_agent_streaming_response_with_exception_via_decorator(mock_chat_client_agent, model_diagnostic_settings): + """Test agent streaming telemetry exception handling through decorator.""" + from agent_framework.telemetry import use_agent_telemetry + + async def run_streaming(self, messages=None, *, thread=None, **kwargs): + from agent_framework import AgentRunResponseUpdate, ChatRole + + yield AgentRunResponseUpdate(text="Partial", role=ChatRole.ASSISTANT) + raise ValueError("Test agent streaming error") + + type(mock_chat_client_agent).run_streaming = run_streaming + + decorated_class = use_agent_telemetry(type(mock_chat_client_agent)) + agent = decorated_class() + + with ( + patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings), + patch("agent_framework.telemetry.use_span") as mock_use_span, + patch("agent_framework.telemetry._get_agent_run_span"), + patch("agent_framework.telemetry._set_agent_run_input"), + patch("agent_framework.telemetry._set_error") as mock_set_error, + ): + mock_span = Mock() + mock_use_span.return_value.__enter__.return_value = mock_span + mock_use_span.return_value.__exit__.return_value = None + + # Should raise the exception and call error handler + with pytest.raises(ValueError, match="Test agent streaming error"): + async for _ in agent.run_streaming("Test message"): + pass + + # Verify error was recorded + mock_set_error.assert_called_once() + assert isinstance(mock_set_error.call_args[0][1], ValueError) + + +@pytest.mark.parametrize("model_diagnostic_settings", [(False, False)], indirect=True) +async def test_agent_streaming_response_diagnostics_disabled_via_decorator(model_diagnostic_settings): + """Test agent streaming response when diagnostics are disabled.""" + from agent_framework import AgentRunResponseUpdate, ChatRole + from agent_framework.telemetry import use_agent_telemetry + + class MockStreamingAgentNoDiagnostics: + AGENT_SYSTEM_NAME = "test_agent_system" + + def __init__(self): + self.id = "test_agent_id" + self.name = "test_agent" + self.display_name = "Test Agent" + + async def run_streaming(self, messages=None, *, thread=None, **kwargs): + yield AgentRunResponseUpdate(text="Test", role=ChatRole.ASSISTANT) + + decorated_class = use_agent_telemetry(MockStreamingAgentNoDiagnostics) + agent = decorated_class() + + with ( + patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings), + patch("agent_framework.telemetry._get_agent_run_span") as mock_get_span, + ): + # Should not create spans when diagnostics are disabled + updates = [] + async for update in agent.run_streaming("Test message"): + updates.append(update) + + assert len(updates) == 1 + # Should not have called telemetry functions + mock_get_span.assert_not_called() + + +@pytest.mark.parametrize("model_diagnostic_settings", [(True, False)], indirect=True) +async def test_agent_empty_streaming_response_via_decorator(model_diagnostic_settings): + """Test agent streaming wrapper with empty response.""" + from agent_framework.telemetry import use_agent_telemetry + + class MockEmptyStreamingAgent: + AGENT_SYSTEM_NAME = "test_agent_system" + + def __init__(self): + self.id = "test_agent_id" + self.name = "test_agent" + self.display_name = "Test Agent" + + async def run_streaming(self, messages=None, *, thread=None, **kwargs): + # Return empty stream + return + yield # This will never be reached + + decorated_class = use_agent_telemetry(MockEmptyStreamingAgent) + agent = decorated_class() + + with ( + patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings), + patch("agent_framework.telemetry.use_span") as mock_use_span, + patch("agent_framework.telemetry._get_agent_run_span"), + patch("agent_framework.telemetry._set_agent_run_input"), + patch("agent_framework.telemetry._set_agent_run_output") as mock_set_output, + ): + mock_span = Mock() + mock_use_span.return_value.__enter__.return_value = mock_span + mock_use_span.return_value.__exit__.return_value = None + + # Should handle empty stream gracefully + updates = [] + async for update in agent.run_streaming("Test message"): + updates.append(update) + + assert len(updates) == 0 + # Should still call telemetry + mock_set_output.assert_called_once() + + +@pytest.mark.parametrize("model_diagnostic_settings", [(True, True)], indirect=True) +async def test_agent_run_with_thread_and_kwargs(mock_chat_client_agent, model_diagnostic_settings): + """Test agent run with thread and additional kwargs.""" + from agent_framework.telemetry import use_agent_telemetry + + decorated_class = use_agent_telemetry(type(mock_chat_client_agent)) + agent = decorated_class() + + # Mock thread + mock_thread = Mock() + mock_thread.id = "test_thread_id" + + with ( + patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings), + patch("agent_framework.telemetry.use_span") as mock_use_span, + patch("agent_framework.telemetry._get_agent_run_span") as mock_get_span, + ): + mock_span = Mock() + mock_use_span.return_value.__enter__.return_value = mock_span + mock_use_span.return_value.__exit__.return_value = None + + # Test with thread and additional kwargs + response = await agent.run( + "Test message", thread=mock_thread, temperature=0.7, max_tokens=100, model="test-model" + ) + assert response is not None + + # Verify the span was created with the correct parameters + mock_get_span.assert_called_once() + call_kwargs = mock_get_span.call_args[1] + assert call_kwargs["agent"] == agent + assert call_kwargs["thread"] == mock_thread + assert call_kwargs["temperature"] == 0.7 + assert call_kwargs["max_tokens"] == 100 + assert call_kwargs["model"] == "test-model" + + +@pytest.mark.parametrize("model_diagnostic_settings", [(True, False)], indirect=True) +async def test_agent_run_with_list_messages(mock_chat_client_agent, model_diagnostic_settings): + """Test agent run with list of messages.""" + from agent_framework import ChatMessage, ChatRole + from agent_framework.telemetry import use_agent_telemetry + + decorated_class = use_agent_telemetry(type(mock_chat_client_agent)) + agent = decorated_class() + + messages = [ + ChatMessage(role=ChatRole.USER, text="First message"), + ChatMessage(role=ChatRole.ASSISTANT, text="Response"), + ChatMessage(role=ChatRole.USER, text="Second message"), + ] + + with ( + patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings), + patch("agent_framework.telemetry.use_span") as mock_use_span, + patch("agent_framework.telemetry._set_agent_run_input") as mock_set_input, + ): + mock_span = Mock() + mock_use_span.return_value.__enter__.return_value = mock_span + mock_use_span.return_value.__exit__.return_value = None + + response = await agent.run(messages) + assert response is not None + + # Verify input was set with the list of messages + mock_set_input.assert_called_once_with("test_agent_system", messages) + + +@pytest.mark.parametrize("model_diagnostic_settings", [(True, False)], indirect=True) +async def test_agent_run_with_exception_handling(mock_chat_client_agent, model_diagnostic_settings): + """Test agent run with exception handling.""" + from agent_framework.telemetry import use_agent_telemetry + + async def run_with_error(self, messages=None, *, thread=None, **kwargs): + raise RuntimeError("Agent run error") + + type(mock_chat_client_agent).run = run_with_error + + decorated_class = use_agent_telemetry(type(mock_chat_client_agent)) + agent = decorated_class() + + with ( + patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings), + patch("agent_framework.telemetry.use_span") as mock_use_span, + ): + mock_span = Mock() + mock_use_span.return_value.__enter__.return_value = mock_span + mock_use_span.return_value.__exit__.return_value = None + + # Should raise the exception and call error handler + with pytest.raises(RuntimeError, match="Agent run error"): + await agent.run("Test message") + + # Verify error was recorded + # Check that both error attributes were set on the span + mock_span.set_attribute.assert_called_once_with( + GenAIAttributes.ERROR_TYPE.value, str(type(RuntimeError("Agent run error"))) + ) + mock_span.set_status.assert_called_once_with(StatusCode.ERROR, repr(RuntimeError("Agent run error"))) diff --git a/python/samples/getting_started/telemetry/agent.py b/python/samples/getting_started/telemetry/agent.py new file mode 100644 index 0000000000..3cffec3301 --- /dev/null +++ b/python/samples/getting_started/telemetry/agent.py @@ -0,0 +1,190 @@ +# Copyright (c) Microsoft. All rights reserved. +# type: ignore +import asyncio +import logging +from random import randint +from typing import Annotated + +from agent_framework import ChatClientAgent +from agent_framework.openai import OpenAIChatClient +from azure.monitor.opentelemetry import configure_azure_monitor +from opentelemetry import trace +from opentelemetry._logs import set_logger_provider +from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.metrics import set_meter_provider +from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader +from opentelemetry.sdk.metrics.view import DropAggregation, View +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter +from opentelemetry.semconv.attributes import service_attributes +from opentelemetry.trace import SpanKind, set_tracer_provider +from pydantic import Field +from pydantic_settings import BaseSettings + + +class TelemetrySampleSettings(BaseSettings): + """Settings for the telemetry sample application. + + Optional settings are: + - connection_string: str - The connection string for the Application Insights resource. + This value can be found in the Overview section when examining + your resource from the Azure portal. + (Env var CONNECTION_STRING) + - otlp_endpoint: str - The OTLP endpoint to send telemetry data to. + Depending on the exporter used, you may find this value in different places. + (Env var OTLP_ENDPOINT) + + If no connection string or OTLP endpoint is provided, the telemetry data will be + exported to the console. + """ + + connection_string: str | None = None + otlp_endpoint: str | None = None + + +# Load settings +settings = TelemetrySampleSettings() + +# Create a resource to represent the service/sample +resource = Resource.create({service_attributes.SERVICE_NAME: "TelemetryExample"}) + +# Define the scenarios that can be run +SCENARIOS = ["ai_service", "kernel_function", "auto_function_invocation", "all"] + +if settings.connection_string: + configure_azure_monitor( + connection_string=settings.connection_string, enable_live_metrics=True, logger_name="agent_framework" + ) + + +def set_up_logging(): + class LogFilter(logging.Filter): + """A filter to not process records from several subpackages.""" + + # These are the namespaces that we want to exclude from logging for the purposes of this demo. + namespaces_to_exclude: list[str] = [ + "httpx", + "openai", + ] + + def filter(self, record): + return not any([record.name.startswith(namespace) for namespace in self.namespaces_to_exclude]) + + exporters = [] + if settings.otlp_endpoint: + exporters.append(OTLPLogExporter(endpoint=settings.otlp_endpoint)) + if not exporters: + exporters.append(ConsoleLogExporter()) + + # Create and set a global logger provider for the application. + logger_provider = LoggerProvider(resource=resource) + # Log processors are initialized with an exporter which is responsible + # for sending the telemetry data to a particular backend. + for log_exporter in exporters: + logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter)) + # Sets the global default logger provider + set_logger_provider(logger_provider) + + # Create a logging handler to write logging records, in OTLP format, to the exporter. + handler = LoggingHandler() + handler.addFilter(LogFilter()) + # Attach the handler to the root logger. `getLogger()` with no arguments returns the root logger. + # Events from all child loggers will be processed by this handler. + logger = logging.getLogger() + logger.addHandler(handler) + # Set the logging level to INFO. + logger.setLevel(logging.INFO) + + +def set_up_tracing(): + exporters = [] + if settings.otlp_endpoint: + exporters.append(OTLPSpanExporter(endpoint=settings.otlp_endpoint)) + if not exporters: + exporters.append(ConsoleSpanExporter()) + + # Initialize a trace provider for the application. This is a factory for creating tracers. + tracer_provider = TracerProvider(resource=resource) + # Span processors are initialized with an exporter which is responsible + # for sending the telemetry data to a particular backend. + for exporter in exporters: + tracer_provider.add_span_processor(BatchSpanProcessor(exporter)) + # Sets the global default tracer provider + set_tracer_provider(tracer_provider) + + +def set_up_metrics(): + exporters = [] + if settings.otlp_endpoint: + exporters.append(OTLPMetricExporter(endpoint=settings.otlp_endpoint)) + if not exporters: + exporters.append(ConsoleMetricExporter()) + + # Initialize a metric provider for the application. This is a factory for creating meters. + metric_readers = [ + PeriodicExportingMetricReader(metric_exporter, export_interval_millis=5000) for metric_exporter in exporters + ] + meter_provider = MeterProvider( + metric_readers=metric_readers, + resource=resource, + views=[ + # Dropping all instrument names except for those starting with "agent_framework" + View(instrument_name="*", aggregation=DropAggregation()), + View(instrument_name="agent_framework*"), + ], + ) + # Sets the global default meter provider + set_meter_provider(meter_provider) + + +async def get_weather( + location: Annotated[str, Field(description="The location to get the weather for.")], +) -> str: + """Get the weather for a given location.""" + await asyncio.sleep(randint(0, 10) / 10.0) # Simulate a network call + conditions = ["sunny", "cloudy", "rainy", "stormy"] + return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C." + + +async def main(): + # Set up the providers + # This must be done before any other telemetry calls + set_up_logging() + set_up_tracing() + set_up_metrics() + + tracer = trace.get_tracer("agent_framework") + with tracer.start_as_current_span("Scenario: Agent Chat", kind=SpanKind.CLIENT) as current_span: + print("Running scenario: Agent Chat") + print("Welcome to the chat, type 'exit' to quit.") + agent = ChatClientAgent( + chat_client=OpenAIChatClient(), + tools=get_weather, + name="WeatherAgent", + instructions="You are a weather assistant.", + ) + thread = agent.get_new_thread() + message = input("User: ") + try: + while message.lower() != "exit": + print(f"{agent.display_name}: ", end="") + async for update in agent.run_streaming( + message, + thread=thread, + ): + if update.text: + print(update.text, end="") + message = input("\nUser: ") + except Exception as e: + current_span.record_exception(e) + print(f"\nError running interactive chat: {e}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/telemetry/interactive.py b/python/samples/getting_started/telemetry/interactive.py index f5209e7c75..88dd294cde 100644 --- a/python/samples/getting_started/telemetry/interactive.py +++ b/python/samples/getting_started/telemetry/interactive.py @@ -23,7 +23,7 @@ from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.semconv.attributes import service_attributes -from opentelemetry.trace import set_tracer_provider +from opentelemetry.trace import SpanKind, set_tracer_provider from pydantic import Field from pydantic_settings import BaseSettings @@ -98,7 +98,7 @@ def set_up_logging(): # Events from all child loggers will be processed by this handler. logger = logging.getLogger() logger.addHandler(handler) - # Set the logging level to NOTSET to allow all records to be processed by the handler. + # Set the logging level to WARNING, this will not log detailed events to the logger. logger.setLevel(logging.WARNING) @@ -159,7 +159,7 @@ async def main(): set_up_metrics() tracer = trace.get_tracer("agent_framework") - with tracer.start_as_current_span("Scenario: Interactive Chat") as current_span: + with tracer.start_as_current_span("Scenario: Interactive Chat", kind=SpanKind.CLIENT) as current_span: print("Running scenario: Interactive Chat") print("Welcome to the chat, type 'exit' to quit.") client = OpenAIChatClient() diff --git a/python/samples/getting_started/telemetry/scenarios.py b/python/samples/getting_started/telemetry/scenarios.py index 18db00b57a..044260d8ca 100644 --- a/python/samples/getting_started/telemetry/scenarios.py +++ b/python/samples/getting_started/telemetry/scenarios.py @@ -24,7 +24,7 @@ from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.semconv.attributes import service_attributes -from opentelemetry.trace import set_tracer_provider +from opentelemetry.trace import SpanKind, set_tracer_provider from opentelemetry.trace.span import format_trace_id from pydantic import Field from pydantic_settings import BaseSettings @@ -170,7 +170,7 @@ async def run_chat_client(stream: bool = False) -> None: tracer = trace.get_tracer(__name__) with tracer.start_as_current_span( - "Scenario: Chat Client Stream" if stream else "Scenario: Chat Client" + "Scenario: Chat Client Stream" if stream else "Scenario: Chat Client", kind=SpanKind.CLIENT ) as current_span: print("Running scenario: Chat Client" if not stream else "Running scenario: Chat Client Stream") try: @@ -203,7 +203,7 @@ async def run_ai_function() -> None: """ tracer = trace.get_tracer(__name__) - with tracer.start_as_current_span("Scenario: AI Function") as current_span: + with tracer.start_as_current_span("Scenario: AI Function", kind=SpanKind.CLIENT) as current_span: print("Running scenario: AI Function") try: func = ai_function(get_weather) @@ -222,7 +222,7 @@ async def main(scenario: Literal["chat_client", "chat_client_stream", "ai_functi set_up_metrics() tracer = trace.get_tracer("agent_framework") - with tracer.start_as_current_span("Scenario's") as current_span: + with tracer.start_as_current_span("Scenario's", kind=SpanKind.CLIENT) as current_span: print(f"Trace ID: {format_trace_id(current_span.get_span_context().trace_id)}") # Scenarios where telemetry is collected in the SDK, from the most basic to the most complex.