Python telemetry (#223)

* initial work on telemetry

* moved tool operation const

* missing quotes

* working otel with samples

* updated readme and other assets

* added tests

* added tests

* small updates

* updated genaiattributes docs

* updated tests

* additional warning

* cleanup of tests
This commit is contained in:
Eduard van Valkenburg
2025-07-28 09:33:42 +02:00
committed by GitHub
Unverified
parent 3ee9dddfa2
commit 0ce8eb1e2f
27 changed files with 2197 additions and 153 deletions
+2
View File
@@ -177,3 +177,5 @@ cython_debug/
# Visual Studio 2015/2017 cache/options directory
.vs/
**/.user/**
-20
View File
@@ -1,20 +0,0 @@
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the weather for a given location.",
"parameters": {
"properties": {
"location": {
"title": "Location",
"type": "string"
}
},
"required": [
"location"
],
"title": "get_weather_input",
"type": "object"
}
}
}
@@ -92,6 +92,9 @@ class FoundrySettings(AFBaseSettings):
@use_tool_calling
class FoundryChatClient(ChatClientBase):
"""Azure AI Foundry Chat client."""
MODEL_PROVIDER_NAME: ClassVar[str] = "azure_ai_foundry" # type: ignore[reportIncompatibleVariableOverride]
client: AIProjectClient = Field(...)
credential: AsyncTokenCredential | None = Field(...)
agent_id: str | None = Field(default=None)
@@ -25,11 +25,6 @@ from ._types import (
TInput = TypeVar("TInput", contravariant=True)
TEmbedding = TypeVar("TEmbedding")
TInnerGetResponse = TypeVar("TInnerGetResponse", bound=Callable[..., Awaitable[ChatResponse]])
TInnerGetStreamingResponse = TypeVar(
"TInnerGetStreamingResponse", bound=Callable[..., AsyncIterable[ChatResponseUpdate]]
)
TChatClientBase = TypeVar("TChatClientBase", bound="ChatClientBase")
logger = get_logger()
@@ -64,7 +59,7 @@ async def _auto_invoke_function(
args = tool.input_model.model_validate(merged_args)
exception = None
try:
function_result = await tool.invoke(arguments=args)
function_result = await tool.invoke(arguments=args, tool_call_id=function_call_content.call_id)
except Exception as ex:
exception = ex
function_result = None
@@ -87,7 +82,9 @@ def ai_function_to_json_schema_spec(function: AIFunction[BaseModel, Any]) -> dic
}
def _tool_call_non_streaming(func: TInnerGetResponse) -> TInnerGetResponse:
def _tool_call_non_streaming(
func: Callable[..., Awaitable["ChatResponse"]],
) -> Callable[..., Awaitable["ChatResponse"]]:
"""Decorate the internal _inner_get_response method to enable tool calls."""
@wraps(func)
@@ -153,10 +150,12 @@ def _tool_call_non_streaming(func: TInnerGetResponse) -> TInnerGetResponse:
response.messages.insert(0, msg)
return response
return wrapper # type: ignore[reportReturnType, return-value]
return wrapper
def _tool_call_streaming(func: TInnerGetStreamingResponse) -> TInnerGetStreamingResponse:
def _tool_call_streaming(
func: Callable[..., AsyncIterable["ChatResponseUpdate"]],
) -> Callable[..., AsyncIterable["ChatResponseUpdate"]]:
"""Decorate the internal _inner_get_response method to enable tool calls."""
@wraps(func)
@@ -218,7 +217,7 @@ def _tool_call_streaming(func: TInnerGetStreamingResponse) -> TInnerGetStreaming
async for update in func(self, messages=messages, chat_options=chat_options, **kwargs):
yield update
return wrapper # type: ignore[reportReturnType, return-value]
return wrapper
def use_tool_calling(cls: type[TChatClientBase]) -> type[TChatClientBase]:
@@ -382,6 +381,9 @@ class ChatClient(Protocol):
class ChatClientBase(AFBaseModel, ABC):
"""Base class for chat clients."""
MODEL_PROVIDER_NAME: str = "unknown"
# This is used for OTel setup, should be overridden in subclasses
def _prepare_messages(
self, messages: str | ChatMessage | list[str] | list[ChatMessage]
) -> MutableSequence[ChatMessage]:
@@ -632,6 +634,14 @@ class ChatClientBase(AFBaseModel, ABC):
else:
chat_options.tool_choice = chat_tool_mode.mode
def service_url(self) -> str | None:
"""Get the URL of the service.
Override this in the subclass to return the proper URL.
If the service does not have a URL, return None.
"""
return None
# region: Embedding Client
+42 -4
View File
@@ -3,10 +3,19 @@
import inspect
from collections.abc import Awaitable, Callable
from functools import wraps
from time import perf_counter
from typing import Any, Generic, Protocol, TypeVar, runtime_checkable
from opentelemetry import metrics, trace
from pydantic import BaseModel, create_model
from ._logging import get_logger
from .telemetry import GenAIAttributes, start_as_current_span
tracer: trace.Tracer = trace.get_tracer("agent_framework")
meter: metrics.Meter = metrics.get_meter_provider().get_meter("agent_framework")
logger = get_logger()
__all__ = ["AIFunction", "AITool", "HostedCodeInterpreterTool", "ai_function"]
@@ -65,6 +74,11 @@ class AIFunction(AITool, Generic[ArgsT, ReturnT]):
self.input_model = input_model
self.additional_properties: dict[str, Any] | None = kwargs
self._func = func
self.invocation_duration_histogram = meter.create_histogram(
"agent_framework.function.invocation.duration",
unit="s",
description="Measures the duration of a function's execution",
)
def parameters(self) -> dict[str, Any]:
"""Return the parameter json schemas of the input model."""
@@ -89,14 +103,38 @@ class AIFunction(AITool, Generic[ArgsT, ReturnT]):
arguments: A Pydantic model instance containing the arguments for the function.
kwargs: keyword arguments to pass to the function, will not be used if `args` is provided.
"""
tool_call_id = kwargs.pop("tool_call_id", None)
if arguments is not None:
if not isinstance(arguments, self.input_model):
raise TypeError(f"Expected {self.input_model.__name__}, got {type(arguments).__name__}")
kwargs = arguments.model_dump(exclude_none=True)
res = self.__call__(**kwargs)
if inspect.isawaitable(res):
return await res
return res
logger.info(f"Function name: {self.name}")
logger.debug(f"Function arguments: {kwargs}")
with start_as_current_span(
tracer, self, metadata={"tool_call_id": tool_call_id, "kwargs": kwargs}
) as current_span:
attributes: dict[str, Any] = {
GenAIAttributes.MEASUREMENT_FUNCTION_TAG_NAME.value: self.name,
GenAIAttributes.TOOL_CALL_ID.value: tool_call_id,
}
starting_time_stamp = perf_counter()
try:
res = self.__call__(**kwargs)
result = await res if inspect.isawaitable(res) else res
logger.info(f"Function {self.name} succeeded.")
logger.debug(f"Function result: {result or 'None'}")
return result # type: ignore[reportReturnType]
except Exception as exception:
attributes[GenAIAttributes.ERROR_TYPE.value] = type(exception).__name__
current_span.record_exception(exception)
current_span.set_attribute(GenAIAttributes.ERROR_TYPE.value, type(exception).__name__)
current_span.set_status(trace.StatusCode.ERROR, description=str(exception))
logger.error(f"Function failed. Error: {exception}")
raise
finally:
duration = perf_counter() - starting_time_stamp
self.invocation_duration_histogram.record(duration, attributes=attributes)
logger.info("Function completed. Duration: %fs", duration)
def ai_function(
@@ -204,7 +204,11 @@ def _process_update(
) -> None:
"""Processes a single update and modifies the response in place."""
is_new_message = False
if not response.messages or (update.message_id and response.messages[-1].message_id != update.message_id):
if (
not response.messages
or (update.message_id and response.messages[-1].message_id != update.message_id)
or (update.role and response.messages[-1].role != update.role)
):
is_new_message = True
if is_new_message:
@@ -4,7 +4,7 @@ import json
from collections.abc import AsyncIterable, Mapping, MutableSequence, Sequence
from datetime import datetime
from itertools import chain
from typing import Any, cast
from typing import Any, ClassVar, cast
from openai import AsyncOpenAI, AsyncStream
from openai.types import CompletionUsage
@@ -29,21 +29,19 @@ from .._types import (
UsageDetails,
)
from ..exceptions import ServiceInitializationError, ServiceInvalidResponseError
from ..telemetry import use_telemetry
from ._shared import OpenAIConfigBase, OpenAIHandler, OpenAIModelTypes, OpenAISettings
__all__ = ["OpenAIChatClient"]
# region OpenAIChatClientBase
# Implements agent_framework.ChatClient protocol, through ChatClientBase
# region Base Client
@use_telemetry
@use_tool_calling
class OpenAIChatClientBase(OpenAIHandler, ChatClientBase):
"""OpenAI Chat completion class."""
# region Overriding base class methods
# most of the methods are overridden from the ChatClientBase class, otherwise it is mentioned
MODEL_PROVIDER_NAME: ClassVar[str] = "openai" # type: ignore[reportIncompatibleVariableOverride, misc]
async def _inner_get_response(
self,
@@ -100,8 +98,6 @@ class OpenAIChatClientBase(OpenAIHandler, ChatClientBase):
for choice in chunk.choices
)
# endregion
# region content creation
def _create_chat_message_content(
@@ -220,34 +216,34 @@ class OpenAIChatClientBase(OpenAIHandler, ChatClientBase):
# Flatten the list of lists into a single list
return list(chain.from_iterable(list_of_list))
# endregion
# region Parsers
def _openai_chat_message_parser(self, message: ChatMessage) -> list[dict[str, Any]]:
"""Parse a chat message into the openai format."""
all_messages: list[dict[str, Any]] = []
args: dict[str, Any] = {
"role": message.role.value if isinstance(message.role, ChatRole) else message.role,
}
if message.additional_properties:
args["metadata"] = message.additional_properties
for content in message.contents:
args: dict[str, Any] = {
"role": message.role.value if isinstance(message.role, ChatRole) else message.role,
}
if message.additional_properties:
args["metadata"] = message.additional_properties
match content:
case FunctionResultContent():
new_args = args.copy()
new_args["tool_call_id"] = content.call_id
new_args["content"] = content.result
all_messages.append(new_args)
case FunctionCallContent():
function_call = self._openai_content_parser(content)
if "tool_calls" not in args:
args["tool_calls"] = []
args["tool_calls"].append(function_call) # type: ignore
if all_messages and "tool_calls" in all_messages[-1]:
# If the last message already has tool calls, append to it
all_messages[-1]["tool_calls"].append(self._openai_content_parser(content))
else:
args["tool_calls"] = [self._openai_content_parser(content)] # type: ignore
case FunctionResultContent():
args["tool_call_id"] = content.call_id
args["content"] = content.result
case _:
if "content" not in args:
args["content"] = []
# this is a list to allow multi-modal content
args["content"].append(self._openai_content_parser(content)) # type: ignore
if "content" in args or "tool_calls" in args:
all_messages.append(args)
if "content" in args or "tool_calls" in args:
all_messages.append(args)
return all_messages
def _openai_content_parser(self, content: AIContents) -> dict[str, Any]:
@@ -268,10 +264,16 @@ class OpenAIChatClientBase(OpenAIHandler, ChatClientBase):
case _:
return content.model_dump(exclude_none=True)
def service_url(self) -> str | None:
"""Get the URL of the service.
# endregion
Override this in the subclass to return the proper URL.
If the service does not have a URL, return None.
"""
return str(self.client.base_url) if self.client else None
# region OpenAIChatClient
# region Public client
class OpenAIChatClient(OpenAIConfigBase, OpenAIChatClientBase):
+433 -20
View File
@@ -1,37 +1,138 @@
# Copyright (c) Microsoft. All rights reserved.
import functools
import json
import logging
import os
from importlib.metadata import PackageNotFoundError, version
from typing import Any, Final
from collections.abc import AsyncIterable, Awaitable, Callable, MutableSequence
from enum import Enum
from typing import TYPE_CHECKING, Any, ClassVar, Final, TypeVar
try:
version_info = version("agent-framework")
except PackageNotFoundError:
version_info = "dev"
from opentelemetry import trace
from opentelemetry.trace import Span, StatusCode, get_tracer, use_span
# Note that if this environment variable does not exist, telemetry is enabled.
TELEMETRY_DISABLED_ENV_VAR = "AZURE_TELEMETRY_DISABLED"
IS_TELEMETRY_ENABLED = os.environ.get(TELEMETRY_DISABLED_ENV_VAR, "false").lower() not in ["true", "1"]
from . import __version__ as version_info
from ._logging import get_logger
from ._pydantic import AFBaseSettings
APP_INFO = (
{
"agent-framework-version": f"python/{version_info}",
}
if IS_TELEMETRY_ENABLED
else None
)
USER_AGENT_KEY: Final[str] = "User-Agent"
HTTP_USER_AGENT: Final[str] = "agent-framework-python"
AGENT_FRAMEWORK_USER_AGENT = f"{HTTP_USER_AGENT}/{version_info}"
if TYPE_CHECKING: # pragma: no cover
from opentelemetry.util._decorator import _AgnosticContextManager # type: ignore[reportPrivateUsage]
from ._clients import ChatClientBase
from ._tools import AIFunction
from ._types import ChatMessage, ChatOptions, ChatResponse, ChatResponseUpdate
TChatClientBase = TypeVar("TChatClientBase", bound="ChatClientBase")
tracer = get_tracer("agent_framework")
logger = get_logger()
__all__ = [
"AGENT_FRAMEWORK_USER_AGENT",
"APP_INFO",
"USER_AGENT_KEY",
"prepend_agent_framework_to_user_agent",
"use_telemetry",
]
# We're recording multiple events for the chat history, some of them are emitted within (hundreds of)
# nanoseconds of each other. The default timestamp resolution is not high enough to guarantee unique
# timestamps for each message. Also Azure Monitor truncates resolution to microseconds and some other
# backends truncate to milliseconds.
#
# But we need to give users a way to restore chat message order, so we're incrementing the timestamp
# by 1 microsecond for each message.
#
# This is a workaround, we'll find a generic and better solution - see
# https://github.com/open-telemetry/semantic-conventions/issues/1701
class ChatMessageListTimestampFilter(logging.Filter):
"""A filter to increment the timestamp of INFO logs by 1 microsecond."""
INDEX_KEY: ClassVar[str] = "CHAT_MESSAGE_INDEX"
def filter(self, record: logging.LogRecord) -> bool:
"""Increment the timestamp of INFO logs by 1 microsecond."""
if hasattr(record, self.INDEX_KEY):
idx = getattr(record, self.INDEX_KEY)
record.created += idx * 1e-6
return True
# Creates a tracer from the global tracer provider
logger.addFilter(ChatMessageListTimestampFilter())
class GenAIAttributes(str, Enum):
"""Enum to capture the attributes used in OpenTelemetry for Generative AI.
Based on: https://opentelemetry.io/docs/concepts/semantic-conventions/
Should always be used, with `.value` to get the string representation.
"""
OPERATION = "gen_ai.operation.name"
SYSTEM = "gen_ai.system"
ERROR_TYPE = "error.type"
MODEL = "gen_ai.request.model"
SEED = "gen_ai.request.seed"
PORT = "server.port"
ENCODING_FORMATS = "gen_ai.request.encoding_formats"
FREQUENCY_PENALTY = "gen_ai.request.frequency_penalty"
MAX_TOKENS = "gen_ai.request.max_tokens"
STOP_SEQUENCES = "gen_ai.request.stop_sequences"
TEMPERATURE = "gen_ai.request.temperature"
TOP_K = "gen_ai.request.top_k"
TOP_P = "gen_ai.request.top_p"
FINISH_REASON = "gen_ai.response.finish_reason"
RESPONSE_ID = "gen_ai.response.id"
INPUT_TOKENS = "gen_ai.usage.input_tokens"
OUTPUT_TOKENS = "gen_ai.usage.output_tokens"
TOOL_CALL_ID = "gen_ai.tool.call.id"
TOOL_DESCRIPTION = "gen_ai.tool.description"
TOOL_NAME = "gen_ai.tool.name"
ADDRESS = "server.address"
# Activity events
EVENT_NAME = "event.name"
SYSTEM_MESSAGE = "gen_ai.system.message"
USER_MESSAGE = "gen_ai.user.message"
ASSISTANT_MESSAGE = "gen_ai.assistant.message"
TOOL_MESSAGE = "gen_ai.tool.message"
CHOICE = "gen_ai.choice"
PROMPT = "gen_ai.prompt"
# Operation names
CHAT_COMPLETION_OPERATION = "chat.completions"
CHAT_STREAMING_COMPLETION_OPERATION = "chat.streaming_completions"
TOOL_EXECUTION_OPERATION = "execute_tool"
# Agent Framework specific attributes
MEASUREMENT_FUNCTION_TAG_NAME = "agent_framework.function.name"
ROLE_EVENT_MAP = {
"system": GenAIAttributes.SYSTEM_MESSAGE.value,
"user": GenAIAttributes.USER_MESSAGE.value,
"assistant": GenAIAttributes.ASSISTANT_MESSAGE.value,
"tool": GenAIAttributes.TOOL_MESSAGE.value,
}
# Note that if this environment variable does not exist, telemetry is enabled.
TELEMETRY_DISABLED_ENV_VAR = "AZURE_TELEMETRY_DISABLED"
IS_TELEMETRY_ENABLED = os.environ.get(TELEMETRY_DISABLED_ENV_VAR, "false").lower() not in ["true", "1"]
APP_INFO = (
{
"agent-framework-version": f"python/{version_info}", # type: ignore[has-type]
}
if IS_TELEMETRY_ENABLED
else None
)
USER_AGENT_KEY: Final[str] = "User-Agent"
HTTP_USER_AGENT: Final[str] = "agent-framework-python"
AGENT_FRAMEWORK_USER_AGENT = f"{HTTP_USER_AGENT}/{version_info}" # type: ignore[has-type]
def prepend_agent_framework_to_user_agent(headers: dict[str, Any]) -> dict[str, Any]:
"""Prepend "agent-framework" to the User-Agent in the headers.
@@ -50,4 +151,316 @@ def prepend_agent_framework_to_user_agent(headers: dict[str, Any]) -> dict[str,
return headers
__all__ = ["AGENT_FRAMEWORK_USER_AGENT", "APP_INFO", "USER_AGENT_KEY", "prepend_agent_framework_to_user_agent"]
class ModelDiagnosticSettings(AFBaseSettings):
"""Settings for model diagnostics.
The settings are first loaded from environment variables with
the prefix 'AGENT_FRAMEWORK_GENAI_'.
If the environment variables are not found, the settings can
be loaded from a .env file with the encoding 'utf-8'.
If the settings are not found in the .env file, the settings
are ignored; however, validation will fail alerting that the
settings are missing.
Warning:
Sensitive events should only be enabled on test and development environments.
Required settings for prefix 'AGENT_FRAMEWORK_GENAI_' are:
- enable_otel_diagnostics: bool - Enable OpenTelemetry diagnostics. Default is False.
(Env var AGENT_FRAMEWORK_GENAI_ENABLE_OTEL_DIAGNOSTICS)
- enable_otel_diagnostics_sensitive: bool - Enable OpenTelemetry sensitive events. Default is False.
(Env var AGENT_FRAMEWORK_GENAI_ENABLE_OTEL_DIAGNOSTICS_SENSITIVE)
"""
env_prefix: ClassVar[str] = "AGENT_FRAMEWORK_GENAI_"
enable_otel_diagnostics: bool = False
enable_otel_diagnostics_sensitive: bool = False
@property
def ENABLED(self) -> bool:
"""Check if model diagnostics are enabled.
Model diagnostics are enabled if either diagnostic is enabled or diagnostic with sensitive events is enabled.
"""
return self.enable_otel_diagnostics or self.enable_otel_diagnostics_sensitive
@property
def SENSITIVE_EVENTS_ENABLED(self) -> bool:
"""Check if sensitive events are enabled.
Sensitive events are enabled if the diagnostic with sensitive events is enabled.
"""
return self.enable_otel_diagnostics_sensitive
MODEL_DIAGNOSTICS_SETTINGS = ModelDiagnosticSettings()
def start_as_current_span(
tracer: trace.Tracer,
function: "AIFunction[Any, Any]",
metadata: dict[str, Any] | None = None,
) -> "_AgnosticContextManager[Span]":
"""Starts a span for the given function using the provided tracer.
Args:
tracer: The OpenTelemetry tracer to use.
function: The function for which to start the span.
metadata: Optional metadata to include in the span attributes.
Returns:
trace.Span: The started span as a context manager.
"""
attributes = {
GenAIAttributes.OPERATION.value: GenAIAttributes.TOOL_EXECUTION_OPERATION.value,
GenAIAttributes.TOOL_NAME.value: function.name,
}
tool_call_id = metadata.get("tool_call_id", None) if metadata else None
if tool_call_id:
attributes[GenAIAttributes.TOOL_CALL_ID.value] = tool_call_id
if function.description:
attributes[GenAIAttributes.TOOL_DESCRIPTION.value] = function.description
return tracer.start_as_current_span(
f"{GenAIAttributes.TOOL_EXECUTION_OPERATION.value} {function.name}", attributes=attributes
)
def _trace_chat_get_response(
completion_func: Callable[..., Awaitable["ChatResponse"]],
) -> Callable[..., Awaitable["ChatResponse"]]:
"""Decorator to trace chat completion activities.
Args:
completion_func: The function to trace.
"""
@functools.wraps(completion_func)
async def wrap_inner_get_response(
self: "ChatClientBase",
*,
messages: MutableSequence["ChatMessage"],
chat_options: "ChatOptions",
**kwargs: Any,
) -> "ChatResponse":
if not MODEL_DIAGNOSTICS_SETTINGS.ENABLED:
# If model diagnostics are not enabled, just return the completion
return await completion_func(
self,
messages=messages,
chat_options=chat_options,
**kwargs,
)
with use_span(
_get_chat_response_span(
GenAIAttributes.CHAT_COMPLETION_OPERATION.value,
getattr(self, "ai_model_id", chat_options.ai_model_id or "unknown"),
self.MODEL_PROVIDER_NAME,
self.service_url() if hasattr(self, "service_url") else None,
chat_options,
),
end_on_exit=True,
) as current_span:
_set_chat_response_input(self.MODEL_PROVIDER_NAME, messages)
try:
response = await completion_func(self, messages=messages, chat_options=chat_options, **kwargs)
_set_chat_response_output(current_span, response, self.MODEL_PROVIDER_NAME)
return response
except Exception as exception:
_set_chat_response_error(current_span, exception)
raise
# Mark the wrapper decorator as a chat completion decorator
wrap_inner_get_response.__model_diagnostics_chat_client__ = True # type: ignore
return wrap_inner_get_response
def _trace_chat_get_streaming_response(
completion_func: Callable[..., AsyncIterable["ChatResponseUpdate"]],
) -> Callable[..., AsyncIterable["ChatResponseUpdate"]]:
"""Decorator to trace streaming chat completion activities.
Args:
completion_func: The function to trace.
"""
@functools.wraps(completion_func)
async def wrap_inner_get_streaming_response(
self: "ChatClientBase", *, messages: MutableSequence["ChatMessage"], chat_options: "ChatOptions", **kwargs: Any
) -> AsyncIterable["ChatResponseUpdate"]:
if not MODEL_DIAGNOSTICS_SETTINGS.ENABLED:
# If model diagnostics are not enabled, just return the completion
async for streaming_chat_message_contents in completion_func(
self, messages=messages, chat_options=chat_options, **kwargs
):
yield streaming_chat_message_contents
return
from ._types import ChatResponse
all_updates: list["ChatResponseUpdate"] = []
with use_span(
_get_chat_response_span(
GenAIAttributes.CHAT_STREAMING_COMPLETION_OPERATION.value,
getattr(self, "ai_model_id", chat_options.ai_model_id or "unknown"),
self.MODEL_PROVIDER_NAME,
self.service_url() if hasattr(self, "service_url") else None,
chat_options,
),
end_on_exit=True,
) as current_span:
_set_chat_response_input(self.MODEL_PROVIDER_NAME, messages)
try:
async for response in completion_func(self, messages=messages, chat_options=chat_options, **kwargs):
all_updates.append(response)
yield response
all_messages_flattened = ChatResponse.from_chat_response_updates(all_updates)
_set_chat_response_output(current_span, all_messages_flattened, self.MODEL_PROVIDER_NAME)
except Exception as exception:
_set_chat_response_error(current_span, exception)
raise
# Mark the wrapper decorator as a streaming chat completion decorator
wrap_inner_get_streaming_response.__model_diagnostics_streaming_chat_completion__ = True # type: ignore
return wrap_inner_get_streaming_response
def use_telemetry(cls: type[TChatClientBase]) -> type[TChatClientBase]:
"""Class decorator that enables telemetry for a chat client.
Remarks:
This only works on classes that derive from ChatClientBase
and the _inner_get_response
and _inner_get_streaming_response methods.
It also relies on the presence of the MODEL_PROVIDER_NAME class variable.
```
"""
if inner_response := getattr(cls, "_inner_get_response", None):
cls._inner_get_response = _trace_chat_get_response(inner_response) # type: ignore
if inner_streaming_response := getattr(cls, "_inner_get_streaming_response", None):
cls._inner_get_streaming_response = _trace_chat_get_streaming_response(inner_streaming_response) # type: ignore
return cls
def _get_chat_response_span(
operation_name: str,
model_name: str,
model_provider: str,
service_url: str | None,
chat_options: "ChatOptions",
) -> Span:
"""Start a text or chat completion span for a given model.
Note that `start_span` doesn't make the span the current span.
Use `use_span` to make it the current span as a context manager.
"""
span = tracer.start_span(f"{operation_name} {model_name}")
# Set attributes on the span
span.set_attributes({
GenAIAttributes.OPERATION.value: operation_name,
GenAIAttributes.SYSTEM.value: model_provider,
GenAIAttributes.MODEL.value: model_name,
})
if service_url:
span.set_attribute(GenAIAttributes.ADDRESS.value, service_url)
if chat_options.seed is not None:
span.set_attribute(GenAIAttributes.SEED.value, chat_options.seed)
if chat_options.frequency_penalty is not None:
span.set_attribute(GenAIAttributes.FREQUENCY_PENALTY.value, chat_options.frequency_penalty)
if chat_options.max_tokens is not None:
span.set_attribute(GenAIAttributes.MAX_TOKENS.value, chat_options.max_tokens)
if chat_options.stop is not None:
span.set_attribute(GenAIAttributes.STOP_SEQUENCES.value, chat_options.stop)
if chat_options.temperature is not None:
span.set_attribute(GenAIAttributes.TEMPERATURE.value, chat_options.temperature)
if chat_options.top_p is not None:
span.set_attribute(GenAIAttributes.TOP_P.value, chat_options.top_p)
if "top_k" in chat_options.additional_properties:
span.set_attribute(GenAIAttributes.TOP_K.value, chat_options.additional_properties["top_k"])
if "encoding_formats" in chat_options.additional_properties:
span.set_attribute(
GenAIAttributes.ENCODING_FORMATS.value, chat_options.additional_properties["encoding_formats"]
)
return span
def _set_chat_response_input(
model_provider: str,
messages: MutableSequence["ChatMessage"],
) -> None:
"""Set the input for a chat response.
The logs will be associated to the current span.
"""
if MODEL_DIAGNOSTICS_SETTINGS.SENSITIVE_EVENTS_ENABLED:
for idx, message in enumerate(messages):
event_name = ROLE_EVENT_MAP.get(message.role.value)
if event_name:
logger.info(
message.model_dump_json(exclude_none=True),
extra={
GenAIAttributes.EVENT_NAME.value: event_name,
GenAIAttributes.SYSTEM.value: model_provider,
ChatMessageListTimestampFilter.INDEX_KEY: idx,
},
)
def _set_chat_response_output(
current_span: Span,
response: "ChatResponse",
model_provider: str,
) -> None:
"""Set the response for a given span."""
first_completion = response.messages[0]
# Set the response ID
response_id = (
first_completion.additional_properties.get("id") if first_completion.additional_properties is not None else None
)
if response_id:
current_span.set_attribute(GenAIAttributes.RESPONSE_ID.value, response_id)
# Set the finish reason
finish_reason = response.finish_reason
if finish_reason:
current_span.set_attribute(GenAIAttributes.FINISH_REASON.value, finish_reason.value)
# Set usage attributes
usage = response.usage_details
if usage:
if usage.input_token_count:
current_span.set_attribute(GenAIAttributes.INPUT_TOKENS.value, usage.input_token_count)
if usage.output_token_count:
current_span.set_attribute(GenAIAttributes.OUTPUT_TOKENS.value, usage.output_token_count)
# Set the completion event
if MODEL_DIAGNOSTICS_SETTINGS.SENSITIVE_EVENTS_ENABLED:
for completion in response.messages:
full_response: dict[str, Any] = {
"message": completion.model_dump(exclude_none=True),
}
full_response["index"] = response.response_id
logger.info(
json.dumps(full_response),
extra={
GenAIAttributes.EVENT_NAME.value: GenAIAttributes.CHOICE.value,
GenAIAttributes.SYSTEM.value: model_provider,
},
)
def _set_chat_response_error(span: Span, error: Exception) -> None:
"""Set an error for chat client responses."""
span.set_attribute(GenAIAttributes.ERROR_TYPE.value, str(type(error)))
span.set_status(StatusCode.ERROR, repr(error))
+2
View File
@@ -27,6 +27,8 @@ dependencies = [
"pydantic>=2.11.7",
"pydantic-settings>=2.10.1",
"typing-extensions>=4.14.0",
"opentelemetry-api ~= 1.24",
"opentelemetry-sdk ~= 1.24",
]
[project.optional-dependencies]
@@ -5,6 +5,7 @@ from pydantic import BaseModel
from pytest import fixture
from agent_framework import AITool, ChatMessage, ai_function
from agent_framework.telemetry import ModelDiagnosticSettings
@fixture(scope="function")
@@ -40,3 +41,19 @@ def ai_function_tool() -> AITool:
return x + y
return simple_function
@fixture
def model_diagnostic_settings(monkeypatch, request) -> ModelDiagnosticSettings:
"""Fixture to set environment variables for ModelDiagnosticSettings."""
enabled = getattr(request, "param", (None, None))[0]
sensitive = getattr(request, "param", (None, None))[1]
if enabled is None:
monkeypatch.delenv("AGENT_FRAMEWORK_GENAI_ENABLE_OTEL_DIAGNOSTICS", raising=False)
else:
monkeypatch.setenv("AGENT_FRAMEWORK_GENAI_ENABLE_OTEL_DIAGNOSTICS", str(enabled).lower())
if sensitive is None:
monkeypatch.delenv("AGENT_FRAMEWORK_GENAI_ENABLE_OTEL_DIAGNOSTICS_SENSITIVE", raising=False)
else:
monkeypatch.setenv("AGENT_FRAMEWORK_GENAI_ENABLE_OTEL_DIAGNOSTICS_SENSITIVE", str(sensitive).lower())
return ModelDiagnosticSettings(env_file_path="test.env")
@@ -0,0 +1,612 @@
# Copyright (c) Microsoft. All rights reserved.
import logging
from collections.abc import AsyncIterable, MutableSequence
from typing import Any
from unittest.mock import Mock, patch
import pytest
from agent_framework import (
ChatMessage,
ChatOptions,
ChatResponse,
ChatResponseUpdate,
ChatRole,
UsageDetails,
)
from agent_framework.telemetry import (
AGENT_FRAMEWORK_USER_AGENT,
ROLE_EVENT_MAP,
TELEMETRY_DISABLED_ENV_VAR,
USER_AGENT_KEY,
ChatMessageListTimestampFilter,
GenAIAttributes,
prepend_agent_framework_to_user_agent,
start_as_current_span,
use_telemetry,
)
# region Test constants
def test_telemetry_disabled_env_var():
"""Test that the telemetry disabled environment variable is correctly defined."""
assert TELEMETRY_DISABLED_ENV_VAR == "AZURE_TELEMETRY_DISABLED"
def test_user_agent_key():
"""Test that the user agent key is correctly defined."""
assert USER_AGENT_KEY == "User-Agent"
def test_agent_framework_user_agent_format():
"""Test that the agent framework user agent is correctly formatted."""
assert AGENT_FRAMEWORK_USER_AGENT.startswith("agent-framework-python/")
def test_app_info_when_telemetry_enabled():
"""Test that APP_INFO is set when telemetry is enabled."""
with patch("agent_framework.telemetry.IS_TELEMETRY_ENABLED", True):
import importlib
import agent_framework.telemetry
importlib.reload(agent_framework.telemetry)
from agent_framework.telemetry import APP_INFO
assert APP_INFO is not None
assert "agent-framework-version" in APP_INFO
assert APP_INFO["agent-framework-version"].startswith("python/")
def test_app_info_when_telemetry_disabled():
"""Test that APP_INFO is None when telemetry is disabled."""
# Test the logic directly since APP_INFO is set at module import time
with patch("agent_framework.telemetry.IS_TELEMETRY_ENABLED", False):
# Simulate the module's logic for APP_INFO
test_app_info = (
{
"agent-framework-version": "python/test",
}
if False # This simulates IS_TELEMETRY_ENABLED being False
else None
)
assert test_app_info is None
def test_role_event_map():
"""Test that ROLE_EVENT_MAP contains expected mappings."""
assert ROLE_EVENT_MAP["system"] == GenAIAttributes.SYSTEM_MESSAGE.value
assert ROLE_EVENT_MAP["user"] == GenAIAttributes.USER_MESSAGE.value
assert ROLE_EVENT_MAP["assistant"] == GenAIAttributes.ASSISTANT_MESSAGE.value
assert ROLE_EVENT_MAP["tool"] == GenAIAttributes.TOOL_MESSAGE.value
def test_enum_values():
"""Test that GenAIAttributes enum has expected values."""
assert GenAIAttributes.OPERATION.value == "gen_ai.operation.name"
assert GenAIAttributes.SYSTEM.value == "gen_ai.system"
assert GenAIAttributes.MODEL.value == "gen_ai.request.model"
assert GenAIAttributes.CHAT_COMPLETION_OPERATION.value == "chat.completions"
assert GenAIAttributes.CHAT_STREAMING_COMPLETION_OPERATION.value == "chat.streaming_completions"
assert GenAIAttributes.TOOL_EXECUTION_OPERATION.value == "execute_tool"
# region Test prepend_agent_framework_to_user_agent
def test_prepend_to_existing_user_agent():
"""Test prepending to existing User-Agent header."""
headers = {"User-Agent": "existing-agent/1.0"}
result = prepend_agent_framework_to_user_agent(headers)
assert "User-Agent" in result
assert result["User-Agent"].startswith("agent-framework-python/")
assert "existing-agent/1.0" in result["User-Agent"]
def test_prepend_to_empty_headers():
"""Test prepending to headers without User-Agent."""
headers = {"Content-Type": "application/json"}
result = prepend_agent_framework_to_user_agent(headers)
assert "User-Agent" in result
assert result["User-Agent"] == AGENT_FRAMEWORK_USER_AGENT
assert "Content-Type" in result
def test_prepend_to_empty_dict():
"""Test prepending to empty headers dict."""
headers = {}
result = prepend_agent_framework_to_user_agent(headers)
assert "User-Agent" in result
assert result["User-Agent"] == AGENT_FRAMEWORK_USER_AGENT
def test_modifies_original_dict():
"""Test that the function modifies the original headers dict."""
headers = {"Other-Header": "value"}
result = prepend_agent_framework_to_user_agent(headers)
assert result is headers # Same object
assert "User-Agent" in headers
# region ModelDiagnosticSettings tests
@pytest.mark.parametrize("model_diagnostic_settings", [(None, None)], indirect=True)
def test_default_values(model_diagnostic_settings):
"""Test default values for ModelDiagnosticSettings."""
assert not model_diagnostic_settings.ENABLED
assert not model_diagnostic_settings.SENSITIVE_EVENTS_ENABLED
@pytest.mark.parametrize("model_diagnostic_settings", [(False, False)], indirect=True)
def test_disabled(model_diagnostic_settings):
"""Test default values for ModelDiagnosticSettings."""
assert not model_diagnostic_settings.ENABLED
assert not model_diagnostic_settings.SENSITIVE_EVENTS_ENABLED
@pytest.mark.parametrize("model_diagnostic_settings", [(True, False)], indirect=True)
def test_non_sensitive_events_enabled(model_diagnostic_settings):
"""Test loading model_diagnostic_settings from environment variables."""
assert model_diagnostic_settings.ENABLED
assert not model_diagnostic_settings.SENSITIVE_EVENTS_ENABLED
@pytest.mark.parametrize("model_diagnostic_settings", [(True, True)], indirect=True)
def test_sensitive_events_enabled(model_diagnostic_settings):
"""Test loading model_diagnostic_settings from environment variables."""
assert model_diagnostic_settings.ENABLED
assert model_diagnostic_settings.SENSITIVE_EVENTS_ENABLED
@pytest.mark.parametrize("model_diagnostic_settings", [(False, True)], indirect=True)
def test_sensitive_events_enabled_only(model_diagnostic_settings):
"""Test loading sensitive events setting from environment.
But when sensitive events are enabled, diagnostics are also enabled.
"""
assert model_diagnostic_settings.ENABLED
assert model_diagnostic_settings.SENSITIVE_EVENTS_ENABLED
# region Test ChatMessageListTimestampFilter
def test_filter_without_index_key():
"""Test filter method when record doesn't have INDEX_KEY."""
log_filter = ChatMessageListTimestampFilter()
record = logging.LogRecord(
name="test", level=logging.INFO, pathname="", lineno=0, msg="test message", args=(), exc_info=None
)
original_created = record.created
result = log_filter.filter(record)
assert result is True
assert record.created == original_created
def test_filter_with_index_key():
"""Test filter method when record has INDEX_KEY."""
log_filter = ChatMessageListTimestampFilter()
record = logging.LogRecord(
name="test", level=logging.INFO, pathname="", lineno=0, msg="test message", args=(), exc_info=None
)
original_created = record.created
# Add the index key
setattr(record, ChatMessageListTimestampFilter.INDEX_KEY, 5)
result = log_filter.filter(record)
assert result is True
# Should increment by 5 microseconds (5 * 1e-6)
assert record.created == original_created + 5 * 1e-6
def test_index_key_constant():
"""Test that INDEX_KEY constant is correctly defined."""
assert ChatMessageListTimestampFilter.INDEX_KEY == "CHAT_MESSAGE_INDEX"
# region Test start_as_current_span
def test_start_span_basic():
"""Test starting a span with basic function info."""
mock_tracer = Mock()
mock_span = Mock()
mock_tracer.start_as_current_span.return_value = mock_span
# Create a mock function
mock_function = Mock()
mock_function.name = "test_function"
mock_function.description = "Test function description"
result = start_as_current_span(mock_tracer, mock_function)
assert result == mock_span
mock_tracer.start_as_current_span.assert_called_once()
call_args = mock_tracer.start_as_current_span.call_args
assert call_args[0][0] == "execute_tool test_function"
attributes = call_args[1]["attributes"]
assert attributes[GenAIAttributes.OPERATION.value] == GenAIAttributes.TOOL_EXECUTION_OPERATION.value
assert attributes[GenAIAttributes.TOOL_NAME.value] == "test_function"
assert attributes[GenAIAttributes.TOOL_DESCRIPTION.value] == "Test function description"
def test_start_span_with_metadata():
"""Test starting a span with metadata containing tool_call_id."""
mock_tracer = Mock()
mock_span = Mock()
mock_tracer.start_as_current_span.return_value = mock_span
mock_function = Mock()
mock_function.name = "test_function"
mock_function.description = "Test function"
metadata = {"tool_call_id": "test_call_123"}
_ = start_as_current_span(mock_tracer, mock_function, metadata)
call_args = mock_tracer.start_as_current_span.call_args
attributes = call_args[1]["attributes"]
assert attributes[GenAIAttributes.TOOL_CALL_ID.value] == "test_call_123"
def test_start_span_without_description():
"""Test starting a span when function has no description."""
mock_tracer = Mock()
mock_span = Mock()
mock_tracer.start_as_current_span.return_value = mock_span
mock_function = Mock()
mock_function.name = "test_function"
mock_function.description = None
start_as_current_span(mock_tracer, mock_function)
call_args = mock_tracer.start_as_current_span.call_args
attributes = call_args[1]["attributes"]
assert GenAIAttributes.TOOL_DESCRIPTION.value not in attributes
def test_start_span_empty_metadata():
"""Test starting a span with empty metadata."""
mock_tracer = Mock()
mock_span = Mock()
mock_tracer.start_as_current_span.return_value = mock_span
mock_function = Mock()
mock_function.name = "test_function"
mock_function.description = "Test function"
start_as_current_span(mock_tracer, mock_function, {})
call_args = mock_tracer.start_as_current_span.call_args
attributes = call_args[1]["attributes"]
assert GenAIAttributes.TOOL_CALL_ID.value not in attributes
# region Test use_telemetry decorator
def test_decorator_with_valid_class():
"""Test that decorator works with a valid ChatClientBase-like class."""
# Create a mock class with the required methods
class MockChatClient:
MODEL_PROVIDER_NAME = "test_provider"
async def _inner_get_response(self, *, messages, chat_options, **kwargs):
return Mock()
async def _inner_get_streaming_response(self, *, messages, chat_options, **kwargs):
async def gen():
yield Mock()
return gen()
# Apply the decorator
decorated_class = use_telemetry(MockChatClient)
# Check that the methods were wrapped
assert hasattr(decorated_class._inner_get_response, "__model_diagnostics_chat_client__")
assert hasattr(decorated_class._inner_get_streaming_response, "__model_diagnostics_streaming_chat_completion__")
def test_decorator_with_missing_methods():
"""Test that decorator handles classes missing required methods gracefully."""
class MockChatClient:
MODEL_PROVIDER_NAME = "test_provider"
# Apply the decorator - should not raise an error
decorated_class = use_telemetry(MockChatClient)
# Class should be returned unchanged
assert decorated_class is MockChatClient
def test_decorator_with_partial_methods():
"""Test decorator when only one method is present."""
class MockChatClient:
MODEL_PROVIDER_NAME = "test_provider"
async def _inner_get_response(self, *, messages, chat_options, **kwargs):
return Mock()
decorated_class = use_telemetry(MockChatClient)
# Only the present method should be wrapped
assert hasattr(decorated_class._inner_get_response, "__model_diagnostics_chat_client__")
assert not hasattr(decorated_class, "_inner_get_streaming_response")
# region Test telemetry decorator with mock client
@pytest.fixture
def mock_chat_client():
"""Create a mock chat client for testing."""
class MockChatClient:
MODEL_PROVIDER_NAME = "test_provider"
def __init__(self):
self.ai_model_id = "test-model"
def service_url(self):
return "https://test.example.com"
async def _inner_get_response(
self, *, messages: MutableSequence[ChatMessage], chat_options: ChatOptions, **kwargs: Any
):
return ChatResponse(
messages=[ChatMessage(role=ChatRole.ASSISTANT, text="Test response")],
usage_details=UsageDetails(input_token_count=10, output_token_count=20),
finish_reason=None,
)
async def _inner_get_streaming_response(
self, *, messages: MutableSequence[ChatMessage], chat_options: ChatOptions, **kwargs: Any
):
yield ChatResponseUpdate(text="Hello", role=ChatRole.ASSISTANT)
yield ChatResponseUpdate(text=" world", role=ChatRole.ASSISTANT)
return MockChatClient()
@pytest.mark.parametrize("model_diagnostic_settings", [(False, False)], indirect=True)
async def test_telemetry_disabled_bypasses_instrumentation(mock_chat_client, model_diagnostic_settings):
"""Test that when diagnostics are disabled, telemetry is bypassed."""
decorated_class = use_telemetry(type(mock_chat_client))
client = decorated_class()
messages = [ChatMessage(role=ChatRole.USER, text="Test message")]
chat_options = ChatOptions()
with (
patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings),
patch("agent_framework.telemetry.use_span") as mock_use_span,
):
# This should not create any spans
response = await client._inner_get_response(messages=messages, chat_options=chat_options)
assert response is not None
mock_use_span.assert_not_called()
@pytest.mark.parametrize("model_diagnostic_settings", [(True, True)], indirect=True)
async def test_instrumentation_enabled(mock_chat_client, model_diagnostic_settings):
"""Test that when diagnostics are enabled, telemetry is applied."""
decorated_class = use_telemetry(type(mock_chat_client))
client = decorated_class()
messages = [ChatMessage(role=ChatRole.USER, text="Test message")]
chat_options = ChatOptions()
with (
patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings),
patch("agent_framework.telemetry.use_span") as mock_use_span,
patch("agent_framework.telemetry.logger") as mock_logger,
):
response = await client._inner_get_response(messages=messages, chat_options=chat_options)
assert response is not None
mock_use_span.assert_called_once()
# Check that logger.info was called (telemetry logs input/output)
assert mock_logger.info.call_count == 2
@pytest.mark.parametrize("model_diagnostic_settings", [(True, False)], indirect=True)
async def test_streaming_response_with_diagnostics_enabled_via_decorator(mock_chat_client, model_diagnostic_settings):
"""Test streaming telemetry through the use_telemetry decorator."""
decorated_class = use_telemetry(type(mock_chat_client))
client = decorated_class()
messages = [ChatMessage(role=ChatRole.USER, text="Test")]
chat_options = ChatOptions()
with (
patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings),
patch("agent_framework.telemetry.use_span") as mock_use_span,
patch("agent_framework.telemetry._get_chat_response_span") as mock_get_span,
patch("agent_framework.telemetry._set_chat_response_input") as mock_set_input,
patch("agent_framework.telemetry._set_chat_response_output") as mock_set_output,
):
mock_span = Mock()
mock_use_span.return_value.__enter__.return_value = mock_span
mock_use_span.return_value.__exit__.return_value = None
# We can't easily mock ChatResponse.from_chat_response_updates since it's imported locally,
# but we can verify telemetry calls were made
# Collect all yielded updates
updates = []
async for update in client._inner_get_streaming_response(messages=messages, chat_options=chat_options):
updates.append(update)
# Verify we got the expected updates
assert len(updates) == 2
# Verify telemetry calls were made
mock_get_span.assert_called_once()
mock_set_input.assert_called_once_with("test_provider", messages)
mock_set_output.assert_called_once()
@pytest.mark.parametrize("model_diagnostic_settings", [(True, False)], indirect=True)
async def test_streaming_response_with_exception_via_decorator(mock_chat_client, model_diagnostic_settings):
"""Test streaming telemetry exception handling through decorator."""
async def _inner_get_streaming_response(
self, *, messages: MutableSequence[ChatMessage], chat_options: ChatOptions, **kwargs: Any
) -> AsyncIterable[ChatResponseUpdate]:
yield ChatResponseUpdate(text="Partial", role=ChatRole.ASSISTANT)
raise ValueError("Test streaming error")
type(mock_chat_client)._inner_get_streaming_response = _inner_get_streaming_response
decorated_class = use_telemetry(type(mock_chat_client))
client = decorated_class()
messages = [ChatMessage(role=ChatRole.USER, text="Test")]
chat_options = ChatOptions()
with (
patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings),
patch("agent_framework.telemetry.use_span") as mock_use_span,
patch("agent_framework.telemetry._get_chat_response_span"),
patch("agent_framework.telemetry._set_chat_response_input"),
patch("agent_framework.telemetry._set_chat_response_error") as mock_set_error,
):
mock_span = Mock()
mock_use_span.return_value.__enter__.return_value = mock_span
mock_use_span.return_value.__exit__.return_value = None
# Should raise the exception and call error handler
with pytest.raises(ValueError, match="Test streaming error"):
async for _ in client._inner_get_streaming_response(messages=messages, chat_options=chat_options):
pass
# Verify error was recorded
mock_set_error.assert_called_once()
assert isinstance(mock_set_error.call_args[0][1], ValueError)
@pytest.mark.parametrize("model_diagnostic_settings", [(False, False)], indirect=True)
async def test_streaming_response_diagnostics_disabled_via_decorator(model_diagnostic_settings):
"""Test streaming response when diagnostics are disabled."""
from agent_framework import ChatResponseUpdate
class MockStreamingClientNoDiagnostics:
MODEL_PROVIDER_NAME = "test_provider"
async def _inner_get_streaming_response(
self, *, messages: MutableSequence[ChatMessage], chat_options: ChatOptions, **kwargs: Any
) -> AsyncIterable[ChatResponseUpdate]:
yield ChatResponseUpdate(text="Test", role=ChatRole.ASSISTANT)
decorated_class = use_telemetry(MockStreamingClientNoDiagnostics)
client = decorated_class()
messages = [ChatMessage(role=ChatRole.USER, text="Test")]
chat_options = ChatOptions()
with (
patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings),
patch("agent_framework.telemetry._get_chat_response_span") as mock_get_span,
):
# Should not create spans when diagnostics are disabled
updates = []
async for update in client._inner_get_streaming_response(messages=messages, chat_options=chat_options):
updates.append(update)
assert len(updates) == 1
# Should not have called telemetry functions
mock_get_span.assert_not_called()
# region Test empty streaming response handling
@pytest.mark.parametrize("model_diagnostic_settings", [(True, False)], indirect=True)
async def test_empty_streaming_response_via_decorator(model_diagnostic_settings):
"""Test streaming wrapper with empty response."""
class MockEmptyStreamingClient:
MODEL_PROVIDER_NAME = "test_provider"
def __init__(self):
self.ai_model_id = "test_model"
def service_url(self) -> str:
return "https://test.com"
async def _inner_get_streaming_response(
self, *, messages: MutableSequence[ChatMessage], chat_options: ChatOptions, **kwargs: Any
) -> AsyncIterable[ChatResponseUpdate]:
# Return empty stream
return
yield # This will never be reached
decorated_class = use_telemetry(MockEmptyStreamingClient)
client = decorated_class()
messages = [ChatMessage(role=ChatRole.USER, text="Test")]
chat_options = ChatOptions()
with (
patch("agent_framework.telemetry.MODEL_DIAGNOSTICS_SETTINGS", model_diagnostic_settings),
patch("agent_framework.telemetry.use_span") as mock_use_span,
patch("agent_framework.telemetry._get_chat_response_span"),
patch("agent_framework.telemetry._set_chat_response_input"),
patch("agent_framework.telemetry._set_chat_response_output") as mock_set_output,
):
mock_span = Mock()
mock_use_span.return_value.__enter__.return_value = mock_span
mock_use_span.return_value.__exit__.return_value = None
# Should handle empty stream gracefully
updates = []
async for update in client._inner_get_streaming_response(messages=messages, chat_options=chat_options):
updates.append(update)
assert len(updates) == 0
# Should still call telemetry
mock_set_output.assert_called_once()
def test_start_as_current_span_with_none_metadata():
"""Test start_as_current_span with None metadata."""
mock_tracer = Mock()
mock_span = Mock()
mock_tracer.start_as_current_span.return_value = mock_span
mock_function = Mock()
mock_function.name = "test_function"
mock_function.description = "Test description"
result = start_as_current_span(mock_tracer, mock_function, None)
assert result == mock_span
call_args = mock_tracer.start_as_current_span.call_args
attributes = call_args[1]["attributes"]
assert GenAIAttributes.TOOL_CALL_ID.value not in attributes
def test_prepend_user_agent_with_none_value():
"""Test prepend user agent with None value in headers."""
headers = {"User-Agent": None}
result = prepend_agent_framework_to_user_agent(headers)
# Should handle None gracefully
assert "User-Agent" in result
assert AGENT_FRAMEWORK_USER_AGENT in str(result["User-Agent"])
@@ -0,0 +1,290 @@
# Copyright (c) Microsoft. All rights reserved.
from unittest.mock import Mock, patch
import pytest
from pydantic import BaseModel
from agent_framework import AIFunction, AITool, ai_function
from agent_framework.telemetry import GenAIAttributes
def test_ai_function_decorator():
"""Test the ai_function decorator."""
@ai_function(name="test_tool", description="A test tool")
def test_tool(x: int, y: int) -> int:
"""A simple function that adds two numbers."""
return x + y
assert isinstance(test_tool, AITool)
assert isinstance(test_tool, AIFunction)
assert test_tool.name == "test_tool"
assert test_tool.description == "A test tool"
assert test_tool.parameters() == {
"properties": {"x": {"title": "X", "type": "integer"}, "y": {"title": "Y", "type": "integer"}},
"required": ["x", "y"],
"title": "test_tool_input",
"type": "object",
}
assert test_tool(1, 2) == 3
def test_ai_function_decorator_without_args():
"""Test the ai_function decorator."""
@ai_function
def test_tool(x: int, y: int) -> int:
"""A simple function that adds two numbers."""
return x + y
assert isinstance(test_tool, AITool)
assert isinstance(test_tool, AIFunction)
assert test_tool.name == "test_tool"
assert test_tool.description == "A simple function that adds two numbers."
assert test_tool.parameters() == {
"properties": {"x": {"title": "X", "type": "integer"}, "y": {"title": "Y", "type": "integer"}},
"required": ["x", "y"],
"title": "test_tool_input",
"type": "object",
}
assert test_tool(1, 2) == 3
async def test_ai_function_decorator_with_async():
"""Test the ai_function decorator with an async function."""
@ai_function(name="async_test_tool", description="An async test tool")
async def async_test_tool(x: int, y: int) -> int:
"""An async function that adds two numbers."""
return x + y
assert isinstance(async_test_tool, AITool)
assert isinstance(async_test_tool, AIFunction)
assert async_test_tool.name == "async_test_tool"
assert async_test_tool.description == "An async test tool"
assert async_test_tool.parameters() == {
"properties": {"x": {"title": "X", "type": "integer"}, "y": {"title": "Y", "type": "integer"}},
"required": ["x", "y"],
"title": "async_test_tool_input",
"type": "object",
}
assert (await async_test_tool(1, 2)) == 3
# Telemetry tests for AIFunction
async def test_ai_function_invoke_telemetry_enabled():
"""Test the ai_function invoke method with telemetry enabled."""
@ai_function(name="telemetry_test_tool", description="A test tool for telemetry")
def telemetry_test_tool(x: int, y: int) -> int:
"""A function that adds two numbers for telemetry testing."""
return x + y
# Mock the tracer and span
with (
patch("agent_framework._tools.tracer") as mock_tracer,
patch("agent_framework._tools.start_as_current_span") as mock_start_span,
):
mock_span = Mock()
mock_context_manager = Mock()
mock_context_manager.__enter__ = Mock(return_value=mock_span)
mock_context_manager.__exit__ = Mock(return_value=None)
mock_start_span.return_value = mock_context_manager
# Mock the histogram
mock_histogram = Mock()
telemetry_test_tool.invocation_duration_histogram = mock_histogram
# Call invoke
result = await telemetry_test_tool.invoke(x=1, y=2, tool_call_id="test_call_id")
# Verify result
assert result == 3
# Verify telemetry calls
mock_start_span.assert_called_once_with(
mock_tracer, telemetry_test_tool, metadata={"tool_call_id": "test_call_id", "kwargs": {"x": 1, "y": 2}}
)
# Verify histogram was called with correct attributes
mock_histogram.record.assert_called_once()
call_args = mock_histogram.record.call_args
assert call_args[0][0] > 0 # duration should be positive
attributes = call_args[1]["attributes"]
assert attributes[GenAIAttributes.MEASUREMENT_FUNCTION_TAG_NAME.value] == "telemetry_test_tool"
assert attributes[GenAIAttributes.TOOL_CALL_ID.value] == "test_call_id"
async def test_ai_function_invoke_telemetry_with_pydantic_args():
"""Test the ai_function invoke method with Pydantic model arguments."""
@ai_function(name="pydantic_test_tool", description="A test tool with Pydantic args")
def pydantic_test_tool(x: int, y: int) -> int:
"""A function that adds two numbers using Pydantic args."""
return x + y
# Create arguments as Pydantic model instance
args_model = pydantic_test_tool.input_model(x=5, y=10)
with (
patch("agent_framework._tools.tracer") as mock_tracer,
patch("agent_framework._tools.start_as_current_span") as mock_start_span,
):
mock_span = Mock()
mock_context_manager = Mock()
mock_context_manager.__enter__ = Mock(return_value=mock_span)
mock_context_manager.__exit__ = Mock(return_value=None)
mock_start_span.return_value = mock_context_manager
mock_histogram = Mock()
pydantic_test_tool.invocation_duration_histogram = mock_histogram
# Call invoke with Pydantic model
result = await pydantic_test_tool.invoke(arguments=args_model, tool_call_id="pydantic_call")
# Verify result
assert result == 15
# Verify telemetry calls
mock_start_span.assert_called_once_with(
mock_tracer, pydantic_test_tool, metadata={"tool_call_id": "pydantic_call", "kwargs": {"x": 5, "y": 10}}
)
async def test_ai_function_invoke_telemetry_with_exception():
"""Test the ai_function invoke method with telemetry when an exception occurs."""
@ai_function(name="exception_test_tool", description="A test tool that raises an exception")
def exception_test_tool(x: int, y: int) -> int:
"""A function that raises an exception for telemetry testing."""
raise ValueError("Test exception for telemetry")
with (
patch("agent_framework._tools.tracer"),
patch("agent_framework._tools.start_as_current_span") as mock_start_span,
):
mock_span = Mock()
mock_context_manager = Mock()
mock_context_manager.__enter__ = Mock(return_value=mock_span)
mock_context_manager.__exit__ = Mock(return_value=None)
mock_start_span.return_value = mock_context_manager
mock_histogram = Mock()
exception_test_tool.invocation_duration_histogram = mock_histogram
# Call invoke and expect exception
with pytest.raises(ValueError, match="Test exception for telemetry"):
await exception_test_tool.invoke(x=1, y=2, tool_call_id="exception_call")
# Verify telemetry calls
mock_start_span.assert_called_once()
# Verify span exception recording
mock_span.record_exception.assert_called_once()
mock_span.set_attribute.assert_called()
mock_span.set_status.assert_called_once()
# Verify histogram was called with error attributes
mock_histogram.record.assert_called_once()
call_args = mock_histogram.record.call_args
attributes = call_args[1]["attributes"]
assert attributes[GenAIAttributes.ERROR_TYPE.value] == "ValueError"
async def test_ai_function_invoke_telemetry_async_function():
"""Test the ai_function invoke method with telemetry on async function."""
@ai_function(name="async_telemetry_test", description="An async test tool for telemetry")
async def async_telemetry_test(x: int, y: int) -> int:
"""An async function for telemetry testing."""
return x * y
with (
patch("agent_framework._tools.tracer") as mock_tracer,
patch("agent_framework._tools.start_as_current_span") as mock_start_span,
):
mock_span = Mock()
mock_context_manager = Mock()
mock_context_manager.__enter__ = Mock(return_value=mock_span)
mock_context_manager.__exit__ = Mock(return_value=None)
mock_start_span.return_value = mock_context_manager
mock_histogram = Mock()
async_telemetry_test.invocation_duration_histogram = mock_histogram
# Call invoke
result = await async_telemetry_test.invoke(x=3, y=4, tool_call_id="async_call")
# Verify result
assert result == 12
# Verify telemetry calls
mock_start_span.assert_called_once_with(
mock_tracer, async_telemetry_test, metadata={"tool_call_id": "async_call", "kwargs": {"x": 3, "y": 4}}
)
# Verify histogram recording
mock_histogram.record.assert_called_once()
call_args = mock_histogram.record.call_args
attributes = call_args[1]["attributes"]
assert attributes[GenAIAttributes.MEASUREMENT_FUNCTION_TAG_NAME.value] == "async_telemetry_test"
async def test_ai_function_invoke_telemetry_no_tool_call_id():
"""Test the ai_function invoke method with telemetry when no tool_call_id is provided."""
@ai_function(name="no_id_test_tool", description="A test tool without tool_call_id")
def no_id_test_tool(x: int) -> int:
"""A function for testing without tool_call_id."""
return x * 2
with (
patch("agent_framework._tools.tracer") as mock_tracer,
patch("agent_framework._tools.start_as_current_span") as mock_start_span,
):
mock_span = Mock()
mock_context_manager = Mock()
mock_context_manager.__enter__ = Mock(return_value=mock_span)
mock_context_manager.__exit__ = Mock(return_value=None)
mock_start_span.return_value = mock_context_manager
mock_histogram = Mock()
no_id_test_tool.invocation_duration_histogram = mock_histogram
# Call invoke without tool_call_id
result = await no_id_test_tool.invoke(x=5)
# Verify result
assert result == 10
# Verify telemetry calls
mock_start_span.assert_called_once_with(
mock_tracer, no_id_test_tool, metadata={"tool_call_id": None, "kwargs": {"x": 5}}
)
# Verify histogram attributes
mock_histogram.record.assert_called_once()
call_args = mock_histogram.record.call_args
attributes = call_args[1]["attributes"]
assert attributes[GenAIAttributes.TOOL_CALL_ID.value] is None
async def test_ai_function_invoke_invalid_pydantic_args():
"""Test the ai_function invoke method with invalid Pydantic model arguments."""
@ai_function(name="invalid_args_test", description="A test tool for invalid args")
def invalid_args_test(x: int, y: int) -> int:
"""A function for testing invalid Pydantic args."""
return x + y
# Create a different Pydantic model
class WrongModel(BaseModel):
a: str
b: str
wrong_args = WrongModel(a="hello", b="world")
# Call invoke with wrong model type
with pytest.raises(TypeError, match="Expected invalid_args_test_input, got WrongModel"):
await invalid_args_test.invoke(arguments=wrong_args)
@@ -1,10 +1,12 @@
# Copyright (c) Microsoft. All rights reserved.
from collections.abc import MutableSequence
from typing import Any
from pydantic import BaseModel, ValidationError
from pytest import fixture, mark, raises
# region: TextContent
from agent_framework import (
AgentRunResponse,
AgentRunResponseUpdate,
@@ -26,9 +28,38 @@ from agent_framework import (
TextReasoningContent,
UriContent,
UsageDetails,
ai_function,
)
# region: TextContent
@fixture
def ai_tool() -> AITool:
"""Returns a generic AITool."""
class GenericTool(BaseModel):
name: str
description: str | None = None
additional_properties: dict[str, Any] | None = None
def parameters(self) -> dict[str, Any]:
"""Return the parameters of the tool as a JSON schema."""
return {
"name": {"type": "string"},
}
return GenericTool(name="generic_tool", description="A generic tool")
@fixture
def ai_function_tool() -> AITool:
"""Returns a executable AITool."""
@ai_function
def simple_function(x: int, y: int) -> int:
"""A simple function that adds two numbers."""
return x + y
return simple_function
def test_text_content_positional():
@@ -26,6 +26,11 @@ async def mock_async_process_chat_stream_response(_):
yield mock_content, None
@pytest.fixture(scope="function")
def chat_history() -> list[ChatMessage]:
return []
@pytest.fixture
def mock_chat_completion_response() -> ChatCompletion:
return ChatCompletion(
-66
View File
@@ -1,66 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
from agent_framework import AIFunction, AITool, ai_function
def test_ai_function_decorator():
"""Test the ai_function decorator."""
@ai_function(name="test_tool", description="A test tool")
def test_tool(x: int, y: int) -> int:
"""A simple function that adds two numbers."""
return x + y
assert isinstance(test_tool, AITool)
assert isinstance(test_tool, AIFunction)
assert test_tool.name == "test_tool"
assert test_tool.description == "A test tool"
assert test_tool.parameters() == {
"properties": {"x": {"title": "X", "type": "integer"}, "y": {"title": "Y", "type": "integer"}},
"required": ["x", "y"],
"title": "test_tool_input",
"type": "object",
}
assert test_tool(1, 2) == 3
def test_ai_function_decorator_without_args():
"""Test the ai_function decorator."""
@ai_function
def test_tool(x: int, y: int) -> int:
"""A simple function that adds two numbers."""
return x + y
assert isinstance(test_tool, AITool)
assert isinstance(test_tool, AIFunction)
assert test_tool.name == "test_tool"
assert test_tool.description == "A simple function that adds two numbers."
assert test_tool.parameters() == {
"properties": {"x": {"title": "X", "type": "integer"}, "y": {"title": "Y", "type": "integer"}},
"required": ["x", "y"],
"title": "test_tool_input",
"type": "object",
}
assert test_tool(1, 2) == 3
async def test_ai_function_decorator_with_async():
"""Test the ai_function decorator with an async function."""
@ai_function(name="async_test_tool", description="An async test tool")
async def async_test_tool(x: int, y: int) -> int:
"""An async function that adds two numbers."""
return x + y
assert isinstance(async_test_tool, AITool)
assert isinstance(async_test_tool, AIFunction)
assert async_test_tool.name == "async_test_tool"
assert async_test_tool.description == "An async test tool"
assert async_test_tool.parameters() == {
"properties": {"x": {"title": "X", "type": "integer"}, "y": {"title": "Y", "type": "integer"}},
"required": ["x", "y"],
"title": "async_test_tool_input",
"type": "object",
}
assert (await async_test_tool(1, 2)) == 3
View File
@@ -0,0 +1,4 @@
CONNECTION_STRING="..."
OTLP_ENDPOINT="http://localhost:4317/"
AGENT_FRAMEWORK_GENAI_ENABLE_OTEL_DIAGNOSTICS=true
AGENT_FRAMEWORK_GENAI_ENABLE_OTEL_DIAGNOSTICS_SENSITIVE=true
@@ -0,0 +1,244 @@
# Agent Framework Python Telemetry
This sample project shows how a Python application can be configured to send Agent Framework telemetry to the Application Performance Management (APM) vendors of your choice.
In this sample, we provide options to send telemetry to [Application Insights](https://learn.microsoft.com/en-us/azure/azure-monitor/app/app-insights-overview), [Aspire Dashboard](https://learn.microsoft.com/en-us/dotnet/aspire/fundamentals/dashboard/overview?tabs=bash), and console output.
> Note that it is also possible to use other Application Performance Management (APM) vendors. An example is [Prometheus](https://prometheus.io/docs/introduction/overview/). Please refer to this [link](https://opentelemetry.io/docs/languages/python/exporters/) to learn more about exporters.
For more information, please refer to the following resources:
1. [Azure Monitor OpenTelemetry Exporter](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/monitor/azure-monitor-opentelemetry-exporter)
2. [Aspire Dashboard for Python Apps](https://learn.microsoft.com/en-us/dotnet/aspire/fundamentals/dashboard/standalone-for-python?tabs=flask%2Cwindows)
3. [Python Logging](https://docs.python.org/3/library/logging.html)
4. [Observability in Python](https://www.cncf.io/blog/2022/04/22/opentelemetry-and-python-a-complete-instrumentation-guide/)
## What to expect
The Agent Framework Python SDK is designed to efficiently generate comprehensive logs, traces, and metrics throughout the flow of function execution and model invocation. This allows you to effectively monitor your AI application's performance and accurately track token consumption.
## Configuration
### Required resources
2. OpenAI or [Azure OpenAI](https://learn.microsoft.com/en-us/azure/ai-services/openai/how-to/create-resource?pivots=web-portal)
### Optional resources
1. [Application Insights](https://learn.microsoft.com/en-us/azure/azure-monitor/app/create-workspace-resource)
2. [Aspire Dashboard](https://learn.microsoft.com/en-us/dotnet/aspire/fundamentals/dashboard/standalone-for-python?tabs=flask%2Cwindows#start-the-aspire-dashboard)
### Dependencies
You will also need to install the following dependencies to your virtual environment to run this sample:
```bash
# For Azure ApplicationInsights/AzureMonitor
uv pip install azure-monitor-opentelemetry azure-monitor-opentelemetry-exporter
# For OTLP endpoint
uv pip install opentelemetry-exporter-otlp-proto-grpc
```
## Running the sample
1. Open a terminal and navigate to this folder: `python/samples/getting_started/telemetry/`. This is necessary for the `.env` file to be read correctly.
2. Create a `.env` file if one doesn't already exist in this folder. Please refer to the [example file](./.env.example).
> Note that `CONNECTION_STRING` and `SAMPLE_OTLP_ENDPOINT` are optional. If you don't configure them, everything will get outputted to the console.
> Set `AGENT_FRAMEWORK_GENAI_ENABLE_OTEL_DIAGNOSTICS=true` to enable basic telemetry and `AGENT_FRAMEWORK_GENAI_ENABLE_OTEL_DIAGNOSTICS_SENSITIVE=true` to include sensitive information like prompts and responses.
> Sensitive information should only be enabled in a development or test environment. It is not recommended to enable this in production environments as it may expose sensitive data.
3. Activate your python virtual environment, and then run `python scenarios.py` or `python interactive.py`.
> This will output the Operation/Trace ID, which can be used later for filtering.
### Scenarios
This sample includes two different applications demonstrating Agent Framework telemetry:
#### scenarios.py
Organized into specific scenarios where the framework will generate useful telemetry data:
- `chat_client`: This is when a chat client is invoked directly (i.e. not streaming) with a weather tool function. **Information about the call to the underlying model and tool usage will be recorded**.
- `chat_client_stream`: This is when a chat client is invoked with streaming enabled and a weather tool function. **Information about the streaming call to the underlying model and tool usage will be recorded**.
- `ai_function`: This is when an AI function (`get_weather`) is invoked directly. **Information about the AI function and the call to the underlying model will be recorded**.
By default, running `python scenarios.py` will run all three scenarios. To run individual scenarios, use the `--scenario` command line argument. For example, `python scenarios.py --scenario chat_client`. For more information, please run `python scenarios.py -h`.
#### interactive.py
An interactive chat application that demonstrates telemetry collection in a conversational context. This sample includes the same `get_weather` tool function and allows for multi-turn conversations. Run `python interactive.py` and start chatting. Type 'exit' to quit the application. This sample only logs at the `WARNING` level, so you will not see as much telemetry data as in the `scenarios.py` sample.
## Application Insights/Azure Monitor
### Logs and traces
Go to your Application Insights instance, click on _Transaction search_ on the left menu. Use the operation id output by the program to search for the logs and traces associated with the operation. Click on any of the search result to view the end-to-end transaction details. Read more [here](https://learn.microsoft.com/en-us/azure/azure-monitor/app/transaction-search-and-diagnostics?tabs=transaction-search).
### Metrics
Running the application once will only generate one set of measurements (for each metrics). Run the application a couple times to generate more sets of measurements.
> Note: Make sure not to run the program too frequently. Otherwise, you may get throttled.
Please refer to here on how to analyze metrics in [Azure Monitor](https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/analyze-metrics).
## Logs
When you are in Azure Monitor and want to have a overall view of the span, use this query in the logs section:
```kusto
dependencies
| where operation_Id in (dependencies
| project operation_Id, timestamp
| order by timestamp desc
| summarize operations = make_set(operation_Id), timestamp = max(timestamp) by operation_Id
| order by timestamp desc
| project operation_Id
| take 2)
| evaluate bag_unpack(customDimensions)
| extend tool_call_id = tostring(["gen_ai.tool.call.id"])
| join kind=leftouter (customMetrics
| extend tool_call_id = tostring(customDimensions['gen_ai.tool.call.id'])
| where isnotempty(tool_call_id)
| project tool_call_duration = value, tool_call_id)
on tool_call_id
| project-keep timestamp, target, operation_Id, tool_call_duration, duration, gen_ai*
| order by timestamp asc
```
## Aspire Dashboard
> Make sure you have the dashboard running to receive telemetry data.
Once the the sample finishes running, navigate to http://localhost:18888 in a web browser to see the telemetry data. Follow the instructions [here](https://learn.microsoft.com/en-us/dotnet/aspire/fundamentals/dashboard/explore) to authenticate to the dashboard and start exploring!
## Console output
You won't have to deploy an Application Insights resource or install Docker to run Aspire Dashboard if you choose to inspect telemetry data in a console. However, it is difficult to navigate through all the spans and logs produced, so **this method is only recommended when you are just getting started**.
We recommend you to get started with the `chat_client` scenario as this generates the least amount of telemetry data. Below is similar to what you will see when you run `python scenarios.py --scenario chat_client`:
```Json
{
"name": "chat.completions gpt-4o",
"context": {
"trace_id": "0xbda1d9efcd65435653d18fa37aef7dd3",
"span_id": "0xcd443e1917510385",
"trace_state": "[]"
},
"kind": "SpanKind.INTERNAL",
"parent_id": "0xeca0a2ca7b7a8191",
"start_time": "2024-09-09T23:13:14.625156Z",
"end_time": "2024-09-09T23:13:17.311909Z",
"status": {
"status_code": "UNSET"
},
"attributes": {
"gen_ai.operation.name": "chat.completions",
"gen_ai.system": "openai",
"gen_ai.request.model": "gpt-4o",
"gen_ai.response.id": "chatcmpl-A5hrG13nhtFsOgx4ziuoskjNscHtT",
"gen_ai.response.finish_reason": "FinishReason.STOP",
"gen_ai.response.prompt_tokens": 16,
"gen_ai.response.completion_tokens": 28
},
"events": [
{
"name": "gen_ai.content.prompt",
"timestamp": "2024-09-09T23:13:14.625156Z",
"attributes": {
"gen_ai.prompt": "[{\"role\": \"user\", \"content\": \"Why is the sky blue in one sentence?\"}]"
}
},
{
"name": "gen_ai.content.completion",
"timestamp": "2024-09-09T23:13:17.311909Z",
"attributes": {
"gen_ai.completion": "[{\"role\": \"assistant\", \"content\": \"The sky appears blue because molecules in the Earth's atmosphere scatter shorter wavelengths of sunlight, such as blue, more effectively than longer wavelengths like red.\"}]"
}
}
],
"links": [],
"resource": {
"attributes": {
"telemetry.sdk.language": "python",
"telemetry.sdk.name": "opentelemetry",
"telemetry.sdk.version": "1.26.0",
"service.name": "TelemetryExample"
},
"schema_url": ""
}
}
{
"name": "Scenario: Chat Client",
"context": {
"trace_id": "0xbda1d9efcd65435653d18fa37aef7dd3",
"span_id": "0xeca0a2ca7b7a8191",
"trace_state": "[]"
},
"kind": "SpanKind.INTERNAL",
"parent_id": "0x48af7ad55f2f64b5",
"start_time": "2024-09-09T23:13:14.625156Z",
"end_time": "2024-09-09T23:13:17.312910Z",
"status": {
"status_code": "UNSET"
},
"attributes": {},
"events": [],
"links": [],
"resource": {
"attributes": {
"telemetry.sdk.language": "python",
"telemetry.sdk.name": "opentelemetry",
"telemetry.sdk.version": "1.26.0",
"service.name": "TelemetryExample"
},
"schema_url": ""
}
}
{
"name": "Scenario's",
"context": {
"trace_id": "0xbda1d9efcd65435653d18fa37aef7dd3",
"span_id": "0x48af7ad55f2f64b5",
"trace_state": "[]"
},
"kind": "SpanKind.INTERNAL",
"parent_id": null,
"start_time": "2024-09-09T23:13:13.840481Z",
"end_time": "2024-09-09T23:13:17.312910Z",
"status": {
"status_code": "UNSET"
},
"attributes": {},
"events": [],
"links": [],
"resource": {
"attributes": {
"telemetry.sdk.language": "python",
"telemetry.sdk.name": "opentelemetry",
"telemetry.sdk.version": "1.26.0",
"service.name": "TelemetryExample"
},
"schema_url": ""
}
}
{
"body": "Agent Framework usage: CompletionUsage(completion_tokens=28, prompt_tokens=16, total_tokens=44)",
"severity_number": "<SeverityNumber.INFO: 9>",
"severity_text": "INFO",
"attributes": {
"code.filepath": "/path/to/agent_framework/openai/chat_client.py",
"code.function": "store_usage",
"code.lineno": 81
},
"dropped_attributes": 0,
"timestamp": "2024-09-09T23:13:17.311909Z",
"observed_timestamp": "2024-09-09T23:13:17.311909Z",
"trace_id": "0xbda1d9efcd65435653d18fa37aef7dd3",
"span_id": "0xcd443e1917510385",
"trace_flags": 1,
"resource": {
"attributes": {
"telemetry.sdk.language": "python",
"telemetry.sdk.name": "opentelemetry",
"telemetry.sdk.version": "1.26.0",
"service.name": "TelemetryExample"
},
"schema_url": ""
}
}
```
In the output, you will find three spans: `Scenario's`, `Scenario: Chat Client`, and `chat.completions gpt-4o`, each representing a different layer in the sample. In particular, `chat.completions gpt-4o` is generated by the chat client. Inside it, you will find information about the call, such as the timestamp of the operation, the response id and the finish reason. You will also find sensitive information such as the prompt and response to and from the model (only if you have `AGENT_FRAMEWORK__GENAI_ENABLE_OTEL_DIAGNOSTICS_SENSITIVE` set to true). If you use Application Insights or Aspire Dashboard, these information will be available to you in an interactive UI.
@@ -0,0 +1,186 @@
# Copyright (c) Microsoft. All rights reserved.
# type: ignore
import asyncio
import logging
from random import randint
from typing import Annotated
from agent_framework import ChatMessage, ChatResponse, ChatResponseUpdate
from agent_framework.openai import OpenAIChatClient
from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import trace
from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.view import DropAggregation, View
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.semconv.attributes import service_attributes
from opentelemetry.trace import set_tracer_provider
from pydantic import Field
from pydantic_settings import BaseSettings
class TelemetrySampleSettings(BaseSettings):
"""Settings for the telemetry sample application.
Optional settings are:
- connection_string: str - The connection string for the Application Insights resource.
This value can be found in the Overview section when examining
your resource from the Azure portal.
(Env var CONNECTION_STRING)
- otlp_endpoint: str - The OTLP endpoint to send telemetry data to.
Depending on the exporter used, you may find this value in different places.
(Env var OTLP_ENDPOINT)
If no connection string or OTLP endpoint is provided, the telemetry data will be
exported to the console.
"""
connection_string: str | None = None
otlp_endpoint: str | None = None
# Load settings
settings = TelemetrySampleSettings()
# Create a resource to represent the service/sample
resource = Resource.create({service_attributes.SERVICE_NAME: "TelemetryExample"})
# Define the scenarios that can be run
SCENARIOS = ["ai_service", "kernel_function", "auto_function_invocation", "all"]
if settings.connection_string:
configure_azure_monitor(
connection_string=settings.connection_string, enable_live_metrics=True, logger_name="agent_framework"
)
def set_up_logging():
class LogFilter(logging.Filter):
"""A filter to not process records from several subpackages."""
# These are the namespaces that we want to exclude from logging for the purposes of this demo.
namespaces_to_exclude: list[str] = [
"httpx",
"openai",
]
def filter(self, record):
return not any([record.name.startswith(namespace) for namespace in self.namespaces_to_exclude])
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPLogExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleLogExporter())
# Create and set a global logger provider for the application.
logger_provider = LoggerProvider(resource=resource)
# Log processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
for log_exporter in exporters:
logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
# Sets the global default logger provider
set_logger_provider(logger_provider)
# Create a logging handler to write logging records, in OTLP format, to the exporter.
handler = LoggingHandler()
handler.addFilter(LogFilter())
# Attach the handler to the root logger. `getLogger()` with no arguments returns the root logger.
# Events from all child loggers will be processed by this handler.
logger = logging.getLogger()
logger.addHandler(handler)
# Set the logging level to NOTSET to allow all records to be processed by the handler.
logger.setLevel(logging.WARNING)
def set_up_tracing():
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPSpanExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleSpanExporter())
# Initialize a trace provider for the application. This is a factory for creating tracers.
tracer_provider = TracerProvider(resource=resource)
# Span processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
for exporter in exporters:
tracer_provider.add_span_processor(BatchSpanProcessor(exporter))
# Sets the global default tracer provider
set_tracer_provider(tracer_provider)
def set_up_metrics():
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPMetricExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleMetricExporter())
# Initialize a metric provider for the application. This is a factory for creating meters.
metric_readers = [
PeriodicExportingMetricReader(metric_exporter, export_interval_millis=5000) for metric_exporter in exporters
]
meter_provider = MeterProvider(
metric_readers=metric_readers,
resource=resource,
views=[
# Dropping all instrument names except for those starting with "agent_framework"
View(instrument_name="*", aggregation=DropAggregation()),
View(instrument_name="agent_framework*"),
],
)
# Sets the global default meter provider
set_meter_provider(meter_provider)
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def main():
# Set up the providers
# This must be done before any other telemetry calls
set_up_logging()
set_up_tracing()
set_up_metrics()
tracer = trace.get_tracer("agent_framework")
with tracer.start_as_current_span("Scenario: Interactive Chat") as current_span:
print("Running scenario: Interactive Chat")
print("Welcome to the chat, type 'exit' to quit.")
client = OpenAIChatClient()
messages: list[ChatMessage] = []
message = input("User: ")
try:
while message.lower() != "exit":
messages.append(ChatMessage(role="user", text=message))
print("Assistant: ", end="")
updates: list[ChatResponseUpdate] = []
async for update in client.get_streaming_response(messages, tools=get_weather):
updates.append(update)
if update.text:
print(update.text, end="")
print("")
messages.extend(ChatResponse.from_chat_response_updates(updates).messages)
message = input("User: ")
except Exception as e:
current_span.record_exception(e)
print(f"\nError running interactive chat: {e}")
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,249 @@
# Copyright (c) Microsoft. All rights reserved.
# type: ignore
import argparse
import asyncio
import logging
from random import randint
from typing import Annotated, Literal
from agent_framework import ai_function
from agent_framework.openai import OpenAIChatClient
from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import trace
from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.view import DropAggregation, View
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.semconv.attributes import service_attributes
from opentelemetry.trace import set_tracer_provider
from opentelemetry.trace.span import format_trace_id
from pydantic import Field
from pydantic_settings import BaseSettings
class TelemetrySampleSettings(BaseSettings):
"""Settings for the telemetry sample application.
Optional settings are:
- connection_string: str - The connection string for the Application Insights resource.
This value can be found in the Overview section when examining
your resource from the Azure portal.
(Env var CONNECTION_STRING)
- otlp_endpoint: str - The OTLP endpoint to send telemetry data to.
Depending on the exporter used, you may find this value in different places.
(Env var OTLP_ENDPOINT)
If no connection string or OTLP endpoint is provided, the telemetry data will be
exported to the console.
"""
connection_string: str | None = None
otlp_endpoint: str | None = None
# Load settings
settings = TelemetrySampleSettings()
# Create a resource to represent the service/sample
resource = Resource.create({service_attributes.SERVICE_NAME: "TelemetryExample"})
# Define the scenarios that can be run
SCENARIOS = ["chat_client", "chat_client_stream", "ai_function", "all"]
if settings.connection_string:
configure_azure_monitor(
connection_string=settings.connection_string,
enable_live_metrics=True,
logger_name="agent_framework",
)
def set_up_logging():
class LogFilter(logging.Filter):
"""A filter to not process records from several subpackages."""
# These are the namespaces that we want to exclude from logging for the purposes of this demo.
namespaces_to_exclude: list[str] = [
"httpx",
"openai",
]
def filter(self, record):
return not any([record.name.startswith(namespace) for namespace in self.namespaces_to_exclude])
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPLogExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleLogExporter())
# Create and set a global logger provider for the application.
logger_provider = LoggerProvider(resource=resource)
# Log processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
for log_exporter in exporters:
logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
# Sets the global default logger provider
set_logger_provider(logger_provider)
# Create a logging handler to write logging records, in OTLP format, to the exporter.
handler = LoggingHandler()
handler.addFilter(LogFilter())
# Attach the handler to the root logger. `getLogger()` with no arguments returns the root logger.
# Events from all child loggers will be processed by this handler.
logger = logging.getLogger()
logger.addHandler(handler)
# Set the logging level to NOTSET to allow all records to be processed by the handler.
logger.setLevel(logging.NOTSET)
def set_up_tracing():
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPSpanExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleSpanExporter())
# Initialize a trace provider for the application. This is a factory for creating tracers.
tracer_provider = TracerProvider(resource=resource)
# Span processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
for exporter in exporters:
tracer_provider.add_span_processor(BatchSpanProcessor(exporter))
# Sets the global default tracer provider
set_tracer_provider(tracer_provider)
def set_up_metrics():
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPMetricExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleMetricExporter())
# Initialize a metric provider for the application. This is a factory for creating meters.
metric_readers = [
PeriodicExportingMetricReader(metric_exporter, export_interval_millis=5000) for metric_exporter in exporters
]
meter_provider = MeterProvider(
metric_readers=metric_readers,
resource=resource,
views=[
# Dropping all instrument names except for those starting with "agent_framework"
View(instrument_name="*", aggregation=DropAggregation()),
View(instrument_name="agent_framework*"),
],
)
# Sets the global default meter provider
set_meter_provider(meter_provider)
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def run_chat_client(stream: bool = False) -> None:
"""Run an AI service.
This function runs an AI service and prints the output.
Telemetry will be collected for the service execution behind the scenes,
and the traces will be sent to the configured telemetry backend.
The telemetry will include information about the AI service execution.
Args:
stream (bool): Whether to use streaming for the plugin
"""
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(
"Scenario: Chat Client Stream" if stream else "Scenario: Chat Client"
) as current_span:
print("Running scenario: Chat Client" if not stream else "Running scenario: Chat Client Stream")
try:
client = OpenAIChatClient()
message = "What's the weather in Amsterdam and in Paris?"
print(f"User: {message}")
if stream:
print("Assistant: ", end="")
async for chunk in client.get_streaming_response(message, tools=get_weather):
if str(chunk):
print(str(chunk), end="")
print("")
else:
response = await client.get_response(message, tools=get_weather)
print(f"Assistant: {response}")
except Exception as e:
current_span.record_exception(e)
print(f"Error running AI service: {e}")
async def run_ai_function() -> None:
"""Run a AI function.
This function runs a AI function and prints the output.
Telemetry will be collected for the function execution behind the scenes,
and the traces will be sent to the configured telemetry backend.
The telemetry will include information about the AI function execution
and the AI service execution.
"""
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("Scenario: AI Function") as current_span:
print("Running scenario: AI Function")
try:
func = ai_function(get_weather)
weather = await func.invoke(location="Amsterdam")
print(f"Weather in Amsterdam:\n{weather}")
except Exception as e:
current_span.record_exception(e)
print(f"Error running kernel plugin: {e}")
async def main(scenario: Literal["chat_client", "chat_client_stream", "ai_function", "all"] = "all"):
# Set up the providers
# This must be done before any other telemetry calls
set_up_logging()
set_up_tracing()
set_up_metrics()
tracer = trace.get_tracer("agent_framework")
with tracer.start_as_current_span("Scenario's") as current_span:
print(f"Trace ID: {format_trace_id(current_span.get_span_context().trace_id)}")
# Scenarios where telemetry is collected in the SDK, from the most basic to the most complex.
if scenario == "chat_client" or scenario == "all":
await run_chat_client(stream=False)
if scenario == "chat_client_stream" or scenario == "all":
await run_chat_client(stream=True)
if scenario == "ai_function" or scenario == "all":
await run_ai_function()
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument(
"--scenario",
type=str,
choices=SCENARIOS,
default="all",
help="The scenario to run. Default is all.",
)
args = arg_parser.parse_args()
asyncio.run(main(args.scenario))
+21 -3
View File
@@ -41,6 +41,8 @@ version = "0.1.0b1"
source = { editable = "packages/main" }
dependencies = [
{ name = "openai", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "opentelemetry-api", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "opentelemetry-sdk", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "pydantic", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "pydantic-settings", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "typing-extensions", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
@@ -59,6 +61,8 @@ requires-dist = [
{ name = "agent-framework-azure", marker = "extra == 'azure'", editable = "packages/azure" },
{ name = "agent-framework-foundry", marker = "extra == 'foundry'", editable = "packages/foundry" },
{ name = "openai", specifier = ">=1.94.0" },
{ name = "opentelemetry-api", specifier = "~=1.24" },
{ name = "opentelemetry-sdk", specifier = "~=1.24" },
{ name = "pydantic", specifier = ">=2.11.7" },
{ name = "pydantic-settings", specifier = ">=2.10.1" },
{ name = "typing-extensions", specifier = ">=4.14.0" },
@@ -1832,7 +1836,7 @@ wheels = [
[[package]]
name = "opentelemetry-instrumentation-openai"
version = "0.43.0"
version = "0.43.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "opentelemetry-api", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
@@ -1841,9 +1845,23 @@ dependencies = [
{ name = "opentelemetry-semantic-conventions-ai", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "tiktoken", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/4d/4b/6d327e374c4e9f35f4769515692310d5276cdf40bad268c28d2ae1865183/opentelemetry_instrumentation_openai-0.43.0.tar.gz", hash = "sha256:0329ae6eb8c5ddbf87db2110d872db7faa18f2a8ceaf2c46c9f51fc4c05c2b4f", size = 23438, upload-time = "2025-07-22T08:31:05.408Z" }
sdist = { url = "https://files.pythonhosted.org/packages/df/cf/90675fdf938c67fa362a75f5d26ad83a9982d520ea4f9a8c84e149e93665/opentelemetry_instrumentation_openai-0.43.1.tar.gz", hash = "sha256:73fb071bd1d03481adf33473f784a90101e7a813f742049fb9e5b18c11cef699", size = 23438, upload-time = "2025-07-23T14:39:46.742Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/6a/ed/40a8f791df4a6c5c4455edd1ef7d44afb6a47c7d9a5533421ff258a365db/opentelemetry_instrumentation_openai-0.43.0-py3-none-any.whl", hash = "sha256:75be471e66a2d61608a6604ff3674d3320ecfe3bc5c49f955e37b08aafbb921e", size = 33541, upload-time = "2025-07-22T08:30:36.934Z" },
{ url = "https://files.pythonhosted.org/packages/e8/d6/e253457cd2739e386a9a6f137091522d085b24239cad8ef2af80840d7959/opentelemetry_instrumentation_openai-0.43.1-py3-none-any.whl", hash = "sha256:7b4d738d7c33b8601bce7db345f16852e7c6e65253c7c23e521d31541362bc74", size = 33543, upload-time = "2025-07-23T14:39:17.995Z" },
]
[[package]]
name = "opentelemetry-sdk"
version = "1.35.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "opentelemetry-api", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "opentelemetry-semantic-conventions", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "typing-extensions", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/9a/cf/1eb2ed2ce55e0a9aa95b3007f26f55c7943aeef0a783bb006bdd92b3299e/opentelemetry_sdk-1.35.0.tar.gz", hash = "sha256:2a400b415ab68aaa6f04e8a6a9f6552908fb3090ae2ff78d6ae0c597ac581954", size = 160871, upload-time = "2025-07-11T12:23:39.566Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/01/4f/8e32b757ef3b660511b638ab52d1ed9259b666bdeeceba51a082ce3aea95/opentelemetry_sdk-1.35.0-py3-none-any.whl", hash = "sha256:223d9e5f5678518f4842311bb73966e0b6db5d1e0b74e35074c052cd2487f800", size = 119379, upload-time = "2025-07-11T12:23:24.521Z" },
]
[[package]]