Python: Improved telemetry setup (#421)

* test with stack and simplified names

* quick demo of agent decorator

* moved builder to protocol to enhance functionality

* undid chatclientAgent -> agent rename

* one more

* reverted AIAgent rename

* final reverts

* fixed foundry import

* revert changes

* streamlined otel and fcc decorators

* cleanup of telemetry

* further refinement

* lots of updates

* fixed typing

* fix for mypy

* added input and output atttributes

* fix import

* initial work on baking in otel

* major update to telemetry

* final fixes after rename

* fix

* fix test

* updated tests

* fix for tests

* fixes for tests

* updated based on comments

* removed agent decorator

* fix for Python: ServiceResponseException when using multiple tools
Fixes #649

* addressed comments

* fix tests

* fix tests

* fix tools tests

* fix for conversation_id in assistants client

* fix responses test

* fix tests and mypy

* updated test

* foundry fix

---------

Co-authored-by: Chris <66376200+crickman@users.noreply.github.com>
This commit is contained in:
Eduard van Valkenburg
2025-09-10 16:52:42 +02:00
committed by GitHub
Unverified
parent 6aa746d891
commit 82ca4065cb
45 changed files with 3246 additions and 2984 deletions
@@ -13,7 +13,6 @@ async def streaming_with_mcp(show_raw_stream: bool = False) -> None:
through the raw_representation. You can view this, by setting the show_raw_stream parameter to True.
"""
print("=== Tools Defined on Agent Level ===")
# Tools are provided when creating the agent
# The agent can use these tools for any query during its lifetime
async with ChatAgent(
@@ -1,5 +1,13 @@
CONNECTION_STRING="..."
# Connector environment variables
# Foundry
# see ../../../env.example for details
# OpenAI
# see ../../../env.example for details
# Otel specific variables
APPLICATION_INSIGHTS_CONNECTION_STRING="..."
APPLICATION_INSIGHTS_LIVE_METRICS=true
OTLP_ENDPOINT="http://localhost:4317/"
AGENT_FRAMEWORK_GENAI_ENABLE_OTEL_DIAGNOSTICS=true
AGENT_FRAMEWORK_GENAI_ENABLE_OTEL_DIAGNOSTICS_SENSITIVE=true
AGENT_FRAMEWORK_WORKFLOW_ENABLE_OTEL_DIAGNOSTICS=true
ENABLE_OTEL=true
ENABLE_SENSITIVE_DATA=true
WORKFLOW_ENABLE_OTEL=true
@@ -0,0 +1,86 @@
# Copyright (c) Microsoft. All rights reserved.
# type: ignore
import asyncio
import os
from random import randint
from typing import TYPE_CHECKING, Annotated
from agent_framework.openai import OpenAIResponsesClient
from pydantic import Field
if TYPE_CHECKING:
from agent_framework import ChatClientProtocol
"""
This is the simplest sample of using the Agent Framework with telemetry.
Since it does not create a tracer or span in the script's code, we can let the Agent Framework SDK handle everything.
If the environment variables are set correctly,
the SDK will automatically initialize telemetry and collect traces and logs.
"""
if "AGENT_FRAMEWORK_ENABLE_OTEL" not in os.environ:
print("Set AGENT_FRAMEWORK_ENABLE_OTEL to enable telemetry with a OTLP endpoint.")
if "AGENT_FRAMEWORK_OTLP_ENDPOINT" not in os.environ and "AGENT_FRAMEWORK_MONITOR_CONNECTION_STRING" not in os.environ:
print("Set AGENT_FRAMEWORK_OTLP_ENDPOINT or AGENT_FRAMEWORK_MONITOR_CONNECTION_STRING to enable telemetry.")
async def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
await asyncio.sleep(randint(0, 10) / 10.0) # Simulate a network call
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def run_chat_client(client: "ChatClientProtocol", stream: bool = False) -> None:
"""Run an AI service.
This function runs an AI service and prints the output.
Telemetry will be collected for the service execution behind the scenes,
and the traces will be sent to the configured telemetry backend.
The telemetry will include information about the AI service execution.
Args:
stream: Whether to use streaming for the plugin
Remarks:
When function calling is outside the open telemetry loop
each of the call to the model is handled as a seperate span,
while when the open telemetry is put last, a single span
is shown, which might include one or more rounds of function calling.
So for the scenario below, you should see the following:
2 spans with gen_ai.operation.name=chat
The first has finish_reason "tool_calls"
The second has finish_reason "stop"
2 spans with gen_ai.operation.name=execute_tool
"""
message = "What's the weather in Amsterdam and in Paris?"
print(f"User: {message}")
if stream:
print("Assistant: ", end="")
async for chunk in client.get_streaming_response(message, tools=get_weather):
if str(chunk):
print(str(chunk), end="")
print("")
else:
response = await client.get_response(message, tools=get_weather)
print(f"Assistant: {response}")
async def main() -> None:
client = OpenAIResponsesClient()
# Scenarios where telemetry is collected in the SDK, from the most basic to the most complex.
await run_chat_client(client, stream=True)
await run_chat_client(client, stream=False)
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,134 @@
# Copyright (c) Microsoft. All rights reserved.
# type: ignore
import argparse
import asyncio
from contextlib import suppress
from random import randint
from typing import TYPE_CHECKING, Annotated, Literal
from agent_framework import __version__, ai_function
from agent_framework.openai import OpenAIResponsesClient
from agent_framework.telemetry import setup_telemetry
from opentelemetry import trace
from opentelemetry.trace import SpanKind
from opentelemetry.trace.span import format_trace_id
from pydantic import Field
if TYPE_CHECKING:
from agent_framework import ChatClientProtocol
"""
This sample, show how you can get telemetry from a chat client and tool.
it explicitly calls the `setup_telemetry` function to set up telemetry in order to include the overall spans,
those are defined in the main and run_* functions.
"""
# Define the scenarios that can be run
SCENARIOS = ["chat_client", "chat_client_stream", "ai_function", "all"]
async def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
await asyncio.sleep(randint(0, 10) / 10.0) # Simulate a network call
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def run_chat_client(client: "ChatClientProtocol", stream: bool = False) -> None:
"""Run an AI service.
This function runs an AI service and prints the output.
Telemetry will be collected for the service execution behind the scenes,
and the traces will be sent to the configured telemetry backend.
The telemetry will include information about the AI service execution.
Args:
client: The chat client to use.
stream: Whether to use streaming for the response
Remarks:
For the scenario below, you should see the following:
1 Client span, with 4 children:
2 Internal span with gen_ai.operation.name=chat
The first has finish_reason "tool_calls"
The second has finish_reason "stop"
2 Internal span with gen_ai.operation.name=execute_tool
"""
scenario_name = "Chat Client Stream" if stream else "Chat Client"
tracer = trace.get_tracer("agent_framework", __version__)
with tracer.start_as_current_span(name=f"Scenario: {scenario_name}", kind=SpanKind.CLIENT):
print("Running scenario:", scenario_name)
message = "What's the weather in Amsterdam and in Paris?"
print(f"User: {message}")
if stream:
print("Assistant: ", end="")
async for chunk in client.get_streaming_response(message, tools=get_weather):
if str(chunk):
print(str(chunk), end="")
print("")
else:
response = await client.get_response(message, tools=get_weather)
print(f"Assistant: {response}")
async def run_ai_function() -> None:
"""Run a AI function.
This function runs a AI function and prints the output.
Telemetry will be collected for the function execution behind the scenes,
and the traces will be sent to the configured telemetry backend.
The telemetry will include information about the AI function execution
and the AI service execution.
"""
tracer = trace.get_tracer("agent_framework", __version__)
with tracer.start_as_current_span("Scenario: AI Function", kind=SpanKind.CLIENT):
print("Running scenario: AI Function")
func = ai_function(get_weather)
weather = await func.invoke(location="Amsterdam")
print(f"Weather in Amsterdam:\n{weather}")
async def main(scenario: Literal["chat_client", "chat_client_stream", "ai_function", "all"] = "all"):
"""Run the selected scenario(s)."""
setup_telemetry()
tracer = trace.get_tracer("My application", __version__)
with tracer.start_as_current_span("Sample Scenario's", kind=SpanKind.CLIENT) as current_span:
print(f"Trace ID: {format_trace_id(current_span.get_span_context().trace_id)}")
client = OpenAIResponsesClient()
# Scenarios where telemetry is collected in the SDK, from the most basic to the most complex.
if scenario == "chat_client_stream" or scenario == "all":
with suppress(Exception):
await run_chat_client(client, stream=True)
if scenario == "chat_client" or scenario == "all":
with suppress(Exception):
await run_chat_client(client, stream=False)
if scenario == "ai_function" or scenario == "all":
with suppress(Exception):
await run_ai_function()
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument(
"--scenario",
type=str,
choices=SCENARIOS,
default="all",
help="The scenario to run. Default is all.",
)
args = arg_parser.parse_args()
asyncio.run(main(args.scenario))
@@ -0,0 +1,88 @@
# Copyright (c) Microsoft. All rights reserved.
# type: ignore
import asyncio
import os
from random import randint
from typing import Annotated
from agent_framework import HostedCodeInterpreterTool
from agent_framework.telemetry import setup_telemetry
from agent_framework_foundry import FoundryChatClient
from azure.ai.projects.aio import AIProjectClient
from azure.identity.aio import AzureCliCredential
from opentelemetry import trace
from opentelemetry.trace import SpanKind
from opentelemetry.trace.span import format_trace_id
from pydantic import Field
"""
This sample, shows you can leverage the built-in telemetry in Foundry.
It uses the Foundry client to setup the telemetry, this calls
out to Foundry for a telemetry connection strings,
and then call the setup_telemetry function in the agent framework.
If you want to compare with the trace sent to a generic OTLP endpoint,
switch the `use_foundry_telemetry` variable to False.
"""
# ANSI color codes for printing in blue and resetting after each print
BLUE = "\x1b[34m"
RESET = "\x1b[0m"
async def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
await asyncio.sleep(randint(0, 10) / 10.0) # Simulate a network call
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def main() -> None:
"""Run an AI service.
This function runs an AI service and prints the output.
Telemetry will be collected for the service execution behind the scenes,
and the traces will be sent to the configured telemetry backend.
The telemetry will include information about the AI service execution.
In foundry you will also see specific operations happening that are called by the Foundry implementation,
such as `create_agent`.
"""
use_foundry_telemetry = True
questions = [
"What's the weather in Amsterdam and in Paris?",
"Why is the sky blue?",
"Tell me about AI.",
"Can you write a python function that adds two numbers? and use it to add 8483 and 5692?",
]
async with (
AzureCliCredential() as credential,
AIProjectClient(endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"], credential=credential) as project,
FoundryChatClient(client=project, setup_tracing=False) as client,
):
if use_foundry_telemetry:
await client.setup_foundry_telemetry(enable_live_metrics=True)
else:
setup_telemetry()
tracer = trace.get_tracer("agent_framework")
with tracer.start_as_current_span(name="Foundry Telemetry from Agent Framework", kind=SpanKind.CLIENT) as span:
for question in questions:
print(f"{BLUE}User: {question}{RESET}")
print(f"{BLUE}Assistant: {RESET}", end="")
async for chunk in client.get_streaming_response(
question, tools=[get_weather, HostedCodeInterpreterTool()]
):
if str(chunk):
print(f"{BLUE}{str(chunk)}{RESET}", end="")
print(f"{BLUE}{RESET}")
print(f"{BLUE}Done{RESET}")
print(f"{BLUE}Operation ID: {format_trace_id(span.get_span_context().trace_id)}{RESET}")
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,59 @@
# Copyright (c) Microsoft. All rights reserved.
# type: ignore
import asyncio
from random import randint
from typing import Annotated
from agent_framework import ChatAgent
from agent_framework.openai import OpenAIChatClient
from agent_framework.telemetry import setup_telemetry
from opentelemetry import trace
from opentelemetry.trace import SpanKind
from pydantic import Field
"""
This sample shows you can can setup telemetry with a agent.
The agent invoke is a additional Semantic Convention that now
will wrap the calls made by the underlying chat client and tools.
"""
async def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
await asyncio.sleep(randint(0, 10) / 10.0) # Simulate a network call
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def main():
# Set up the telemetry
setup_telemetry()
questions = ["What's the weather in Amsterdam?", "and in Paris, and which is better?", "Why is the sky blue?"]
tracer = trace.get_tracer("agent_framework")
with tracer.start_as_current_span("Scenario: Agent Chat", kind=SpanKind.CLIENT):
print("Running scenario: Agent Chat")
print("Welcome to the chat, type 'exit' to quit.")
agent = ChatAgent(
chat_client=OpenAIChatClient(),
tools=get_weather,
name="WeatherAgent",
instructions="You are a weather assistant.",
)
thread = agent.get_new_thread()
for question in questions:
print(f"User: {question}")
print(f"{agent.display_name}: ", end="")
async for update in agent.run_stream(
question,
thread=thread,
):
if update.text:
print(update.text, end="")
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,65 @@
# Copyright (c) Microsoft. All rights reserved.
# type: ignore
import asyncio
import os
from random import randint
from typing import Annotated
from agent_framework import ChatAgent
from agent_framework_foundry import FoundryChatClient
from azure.ai.projects.aio import AIProjectClient
from azure.identity.aio import AzureCliCredential
from opentelemetry import trace
from opentelemetry.trace import SpanKind
from pydantic import Field
"""
This sample shows you can can setup telemetry with a agent from Foundry.
We once again call the `setup_foundry_telemetry` method to set up telemetry in order to include the overall spans.
"""
async def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
await asyncio.sleep(randint(0, 10) / 10.0) # Simulate a network call
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def main():
# Set up the providers
# This must be done before any other telemetry calls
questions = ["What's the weather in Amsterdam?", "and in Paris, and which is better?", "Why is the sky blue?"]
async with (
AzureCliCredential() as credential,
AIProjectClient(endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"], credential=credential) as project,
# this calls `setup_foundry_telemetry` through the context manager
FoundryChatClient(client=project) as client,
):
await client.setup_foundry_telemetry(enable_live_metrics=True)
tracer = trace.get_tracer("agent_framework")
with tracer.start_as_current_span("Single Agent Chat", kind=SpanKind.CLIENT):
print("Running Single Agent Chat")
print("Welcome to the chat, type 'exit' to quit.")
agent = ChatAgent(
chat_client=client,
tools=get_weather,
name="WeatherAgent",
instructions="You are a weather assistant.",
)
thread = agent.get_new_thread()
for question in questions:
print(f"User: {question}")
print(f"{agent.display_name}: ", end="")
async for update in agent.run_stream(
question,
thread=thread,
):
if update.text:
print(update.text, end="")
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,120 @@
# Copyright (c) Microsoft. All rights reserved.
# type: ignore
import asyncio
from typing import Any
from agent_framework.telemetry import setup_telemetry
from agent_framework.workflow import (
Executor,
WorkflowBuilder,
WorkflowCompletedEvent,
WorkflowContext,
handler,
)
from opentelemetry import trace
from opentelemetry.trace import SpanKind
from opentelemetry.trace.span import format_trace_id
"""Telemetry sample demonstrating OpenTelemetry integration with Agent Framework workflows.
This sample runs a simple sequential workflow with telemetry collection,
showing telemetry collection for workflow execution, executor processing,
and message publishing between executors.
"""
# Executors for sequential workflow
class UpperCaseExecutor(Executor):
"""An executor that converts text to uppercase."""
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Execute the task by converting the input string to uppercase."""
print(f"UpperCaseExecutor: Processing '{text}'")
result = text.upper()
print(f"UpperCaseExecutor: Result '{result}'")
# Send the result to the next executor in the workflow.
await ctx.send_message(result)
class ReverseTextExecutor(Executor):
"""An executor that reverses text."""
@handler
async def reverse_text(self, text: str, ctx: WorkflowContext[Any]) -> None:
"""Execute the task by reversing the input string."""
print(f"ReverseTextExecutor: Processing '{text}'")
result = text[::-1]
print(f"ReverseTextExecutor: Result '{result}'")
# Send the result with a workflow completion event.
await ctx.add_event(WorkflowCompletedEvent(result))
async def run_sequential_workflow() -> None:
"""Run a simple sequential workflow demonstrating telemetry collection.
This workflow processes a string through two executors in sequence:
1. UpperCaseExecutor converts the input to uppercase
2. ReverseTextExecutor reverses the string and completes the workflow
Telemetry data collected includes:
- Overall workflow execution spans
- Individual executor processing spans
- Message publishing between executors
- Workflow completion events
"""
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("Scenario: Sequential Workflow", kind=SpanKind.CLIENT) as current_span:
print("Running scenario: Sequential Workflow")
try:
# Step 1: Create the executors.
upper_case_executor = UpperCaseExecutor(id="upper_case_executor")
reverse_text_executor = ReverseTextExecutor(id="reverse_text_executor")
# Step 2: Build the workflow with the defined edges.
workflow = (
WorkflowBuilder()
.add_edge(upper_case_executor, reverse_text_executor)
.set_start_executor(upper_case_executor)
.build()
)
# Step 3: Run the workflow with an initial message.
input_text = "hello world"
print(f"Starting workflow with input: '{input_text}'")
completion_event = None
async for event in workflow.run_stream(input_text):
print(f"Event: {event}")
if isinstance(event, WorkflowCompletedEvent):
# The WorkflowCompletedEvent contains the final result.
completion_event = event
if completion_event:
print(f"Workflow completed with result: '{completion_event.data}'")
else:
print("Workflow completed without a completion event")
except Exception as e:
current_span.record_exception(e)
print(f"Error running workflow: {e}")
async def main():
"""Run the telemetry sample with a simple sequential workflow."""
setup_telemetry()
tracer = trace.get_tracer("agent_framework")
with tracer.start_as_current_span("Sequential Workflow Scenario", kind=SpanKind.CLIENT) as current_span:
print(f"Trace ID: {format_trace_id(current_span.get_span_context().trace_id)}")
# Run the sequential workflow scenario
await run_sequential_workflow()
if __name__ == "__main__":
asyncio.run(main())
@@ -1,8 +1,8 @@
# Agent Framework Python Telemetry
This sample project shows how a Python application can be configured to send Agent Framework telemetry to the Application Performance Management (APM) vendors of your choice.
This sample folder shows how a Python application can be configured to send Agent Framework telemetry to the Application Performance Management (APM) vendors of your choice.
In this sample, we provide options to send telemetry to [Application Insights](https://learn.microsoft.com/en-us/azure/azure-monitor/app/app-insights-overview), [Aspire Dashboard](https://learn.microsoft.com/en-us/dotnet/aspire/fundamentals/dashboard/overview?tabs=bash), and console output.
In this sample, we provide options to send telemetry to [Application Insights](https://learn.microsoft.com/en-us/azure/azure-monitor/app/app-insights-overview) and [Aspire Dashboard](https://learn.microsoft.com/en-us/dotnet/aspire/fundamentals/dashboard/overview?tabs=bash).
> **Quick Start**: For local development without Azure setup, you can use the [Aspire Dashboard](https://learn.microsoft.com/en-us/dotnet/aspire/fundamentals/dashboard/standalone) which runs locally via Docker and provides an excellent telemetry viewing experience for OpenTelemetry data.
@@ -24,6 +24,7 @@ The Agent Framework Python SDK is designed to efficiently generate comprehensive
### Required resources
2. OpenAI or [Azure OpenAI](https://learn.microsoft.com/en-us/azure/ai-services/openai/how-to/create-resource?pivots=web-portal)
2. [Foundry project](https://ai.azure.com/doc/azure/ai-foundry/what-is-azure-ai-foundry)
### Optional resources
@@ -31,61 +32,51 @@ The Agent Framework Python SDK is designed to efficiently generate comprehensive
2. [Aspire Dashboard](https://learn.microsoft.com/en-us/dotnet/aspire/fundamentals/dashboard/standalone-for-python?tabs=flask%2Cwindows#start-the-aspire-dashboard)
### Dependencies
No additional dependencies are required to enable telemetry. The necessary packages are included as part of the `agent-framework` package. Unless you want to use a different APM vendor, in which case you will need to install the appropriate OpenTelemetry exporter package.
You will also need to install the following dependencies to your virtual environment to run this sample:
### Environment variables
The following environment variables can be set to configure telemetry, the first two set the basic configuration:
```bash
# For Azure ApplicationInsights/AzureMonitor
uv pip install azure-monitor-opentelemetry azure-monitor-opentelemetry-exporter
# For OTLP endpoint
uv pip install opentelemetry-exporter-otlp-proto-grpc
```
- AGENT_FRAMEWORK_ENABLE_OTEL=true
- AGENT_FRAMEWORK_ENABLE_SENSITIVE_DATA=true
## Running the sample
Next we need to know where to send the telemetry, for that you can use either a OTLP endpoint or a connection string for Application Insights:
- AGENT_FRAMEWORK_OTLP_ENDPOINT="<url to OTLP endpoint>"
or
- AGENT_FRAMEWORK_MONITOR_CONNECTION_STRING="<connection string>"
Finally, you can enable live metrics streaming to Application Insights:
- AGENT_FRAMEWORK_MONITOR_LIVE_METRICS=true
> IMPORTANT - If both OTLP endpoint and connection string are set, the connection string will take precedence and there will be no trace to the OTLP endpoint.
## Samples
This folder contains different samples demonstrating how to use telemetry in various scenarios.
### [01 - zero_code](./01-zero_code.py):
A simple example showing how to enable telemetry in a zero-touch scenario. When the above environment variables are set, telemetry will be automatically enabled, however since you do not define any overarching tracer, you will only see the spans for the specific calls to the chat client and tools.
### [02a](./02a-generic_chat_client.py) and [02b](./02b-foundry_chat_client.py) Chat Clients:
These two samples show how to first setup the telemetry by manually importing the `setup_telemetry` function from the `agent_framework.telemetry` module and calling it. After this is done, the trace that get's created will live in the same context as the chat client calls, allowing you to see the end-to-end flow of your application. For Foundry, there is a method in the Foundry project client to get the telemetry url for your project, the `.setup_foundry_telemetry()` method in the `FoundryChatClient` class will use this url to configure telemetry and you then do not have to import and call `setup_telemetry()` manually.
Because of the way OpenTelemetry works, you can only call `setup_telemetry()` once per application run, so make sure you do that in the right place.
### [03a](./03a-generic_agent.py) and [03b](./03b-foundry_agent.py) Agents:
These two samples show how to setup telemetry when using the Agent Framework's agent abstraction layer. They are similar to the chat client samples, but also show how to create an agent and invoke it. The same rules apply for setting up telemetry, you can either call `setup_telemetry()` manually, or use the `setup_foundry_telemetry()` method in the `FoundryChatClient` class.
### [04 - workflow](./04-workflow.py) Workflow:
This sample shows how to setup telemetry when using the Agent Framework's workflow execution engine. It demonstrates a simple workflow scenario with telemetry.
## Running the samples
1. Open a terminal and navigate to this folder: `python/samples/getting_started/telemetry/`. This is necessary for the `.env` file to be read correctly.
2. Create a `.env` file if one doesn't already exist in this folder. Please refer to the [example file](./.env.example).
> Note that `CONNECTION_STRING` and `SAMPLE_OTLP_ENDPOINT` are optional. If you don't configure them, everything will get outputted to the console.
> Set `AGENT_FRAMEWORK_GENAI_ENABLE_OTEL_DIAGNOSTICS=true` to enable basic telemetry and `AGENT_FRAMEWORK_GENAI_ENABLE_OTEL_DIAGNOSTICS_SENSITIVE=true` to include sensitive information like prompts and responses.
> Set `AGENT_FRAMEWORK_WORKFLOW_ENABLE_OTEL_DIAGNOSTICS=true` to enable workflow telemetry for the workflow samples.
> Set `AGENT_FRAMEWORK_ENABLE_OTEL=true` to enable basic telemetry and `AGENT_FRAMEWORK_ENABLE_SENSITIVE_DATA=true` to include sensitive information like prompts and responses.
> Sensitive information should only be enabled in a development or test environment. It is not recommended to enable this in production environments as it may expose sensitive data.
3. Activate your python virtual environment, and then run `python scenarios.py`, `python interactive.py`, `python agent.py`, or `python workflow.py`.
> Set `AGENT_FRAMEWORK_WORKFLOW_ENABLE_OTEL=true` to enable workflow telemetry for the workflow samples.
3. Activate your python virtual environment, and then run `python 01-zero_code.py` or others.
> This will output the Operation/Trace ID, which can be used later for filtering.
### Scenarios
This sample includes multiple applications demonstrating Agent Framework telemetry:
#### scenarios.py
Organized into specific scenarios where the framework will generate useful telemetry data:
- `chat_client`: This is when a chat client is invoked directly (i.e. not streaming) with a weather tool function. **Information about the call to the underlying model and tool usage will be recorded**.
- `chat_client_stream`: This is when a chat client is invoked with streaming enabled and a weather tool function. **Information about the streaming call to the underlying model and tool usage will be recorded**.
- `ai_function`: This is when an AI function (`get_weather`) is invoked directly. **Information about the AI function and the call to the underlying model will be recorded**.
By default, running `python scenarios.py` will run all three scenarios. To run individual scenarios, use the `--scenario` command line argument. For example, `python scenarios.py --scenario chat_client`. For more information, please run `python scenarios.py -h`.
#### interactive.py
An interactive chat application that demonstrates telemetry collection in a conversational context. This sample includes the same `get_weather` tool function and allows for multi-turn conversations. Run `python interactive.py` and start chatting. Type 'exit' to quit the application. This sample only logs at the `WARNING` level, so you will not see as much telemetry data as in the `scenarios.py` sample.
#### agent.py
A sample demonstrating Agent Framework telemetry collection for agent-based workflows. This shows how telemetry is captured when using the Agent Framework's agent abstraction layer, including agent initialization, message processing, and tool execution within an agent context.
By default, running `python agent.py` will run all agent scenarios. To run individual scenarios, use the `--scenario` command line argument. For example, `python agent.py --scenario basic`. For more information, please run `python agent.py -h`.
#### workflow.py
A sample demonstrating workflow telemetry collection for the Agent Framework's workflow execution engine. This includes two scenarios:
- `sequential`: A simple sequential workflow that processes text through two connected executors (uppercase conversion followed by text reversal). **Information about workflow execution, executor processing, and message passing between executors will be recorded**.
- `sub_workflow`: A more complex scenario demonstrating sub-workflow patterns with a parent workflow orchestrating multiple text processing tasks via sub-workflows. **Information about parent workflow execution, sub-workflow invocation, and cross-workflow communication will be recorded**.
By default, running `python workflow.py` will run all workflow scenarios. To run individual scenarios, use the `--scenario` command line argument. For example, `python workflow.py --scenario sequential`. For more information, please run `python workflow.py -h`.
## Application Insights/Azure Monitor
### Logs and traces
@@ -151,13 +142,13 @@ This will start the dashboard with:
Make sure your `.env` file includes the OTLP endpoint:
```bash
OTLP_ENDPOINT=http://localhost:4317
AGENT_FRAMEWORK_OTLP_ENDPOINT=http://localhost:4317
```
Or set it as an environment variable when running your samples:
```bash
OTLP_ENDPOINT=http://localhost:4317 python scenarios.py
AGENT_FRAMEWORK_ENABLE_OTEL=true AGENT_FRAMEWORK_OTLP_ENDPOINT=http://localhost:4317 python 01-zero_code.py
```
### Viewing telemetry data
@@ -170,138 +161,4 @@ Once your sample finishes running, navigate to <http://localhost:18888> in a web
You won't have to deploy an Application Insights resource or install Docker to run Aspire Dashboard if you choose to inspect telemetry data in a console. However, it is difficult to navigate through all the spans and logs produced, so **this method is only recommended when you are just getting started**.
We recommend you to get started with the `chat_client` scenario as this generates the least amount of telemetry data. Below is similar to what you will see when you run `python scenarios.py --scenario chat_client`:
```Json
{
"name": "chat.completions gpt-4o",
"context": {
"trace_id": "0xbda1d9efcd65435653d18fa37aef7dd3",
"span_id": "0xcd443e1917510385",
"trace_state": "[]"
},
"kind": "SpanKind.INTERNAL",
"parent_id": "0xeca0a2ca7b7a8191",
"start_time": "2024-09-09T23:13:14.625156Z",
"end_time": "2024-09-09T23:13:17.311909Z",
"status": {
"status_code": "UNSET"
},
"attributes": {
"gen_ai.operation.name": "chat.completions",
"gen_ai.system": "openai",
"gen_ai.request.model": "gpt-4o",
"gen_ai.response.id": "chatcmpl-A5hrG13nhtFsOgx4ziuoskjNscHtT",
"gen_ai.response.finish_reason": "FinishReason.STOP",
"gen_ai.response.prompt_tokens": 16,
"gen_ai.response.completion_tokens": 28
},
"events": [
{
"name": "gen_ai.content.prompt",
"timestamp": "2024-09-09T23:13:14.625156Z",
"attributes": {
"gen_ai.prompt": "[{\"role\": \"user\", \"content\": \"Why is the sky blue in one sentence?\"}]"
}
},
{
"name": "gen_ai.content.completion",
"timestamp": "2024-09-09T23:13:17.311909Z",
"attributes": {
"gen_ai.completion": "[{\"role\": \"assistant\", \"content\": \"The sky appears blue because molecules in the Earth's atmosphere scatter shorter wavelengths of sunlight, such as blue, more effectively than longer wavelengths like red.\"}]"
}
}
],
"links": [],
"resource": {
"attributes": {
"telemetry.sdk.language": "python",
"telemetry.sdk.name": "opentelemetry",
"telemetry.sdk.version": "1.26.0",
"service.name": "TelemetryExample"
},
"schema_url": ""
}
}
{
"name": "Scenario: Chat Client",
"context": {
"trace_id": "0xbda1d9efcd65435653d18fa37aef7dd3",
"span_id": "0xeca0a2ca7b7a8191",
"trace_state": "[]"
},
"kind": "SpanKind.INTERNAL",
"parent_id": "0x48af7ad55f2f64b5",
"start_time": "2024-09-09T23:13:14.625156Z",
"end_time": "2024-09-09T23:13:17.312910Z",
"status": {
"status_code": "UNSET"
},
"attributes": {},
"events": [],
"links": [],
"resource": {
"attributes": {
"telemetry.sdk.language": "python",
"telemetry.sdk.name": "opentelemetry",
"telemetry.sdk.version": "1.26.0",
"service.name": "TelemetryExample"
},
"schema_url": ""
}
}
{
"name": "Scenario's",
"context": {
"trace_id": "0xbda1d9efcd65435653d18fa37aef7dd3",
"span_id": "0x48af7ad55f2f64b5",
"trace_state": "[]"
},
"kind": "SpanKind.INTERNAL",
"parent_id": null,
"start_time": "2024-09-09T23:13:13.840481Z",
"end_time": "2024-09-09T23:13:17.312910Z",
"status": {
"status_code": "UNSET"
},
"attributes": {},
"events": [],
"links": [],
"resource": {
"attributes": {
"telemetry.sdk.language": "python",
"telemetry.sdk.name": "opentelemetry",
"telemetry.sdk.version": "1.26.0",
"service.name": "TelemetryExample"
},
"schema_url": ""
}
}
{
"body": "Agent Framework usage: CompletionUsage(completion_tokens=28, prompt_tokens=16, total_tokens=44)",
"severity_number": "<SeverityNumber.INFO: 9>",
"severity_text": "INFO",
"attributes": {
"code.filepath": "/path/to/agent_framework/openai/chat_client.py",
"code.function": "store_usage",
"code.lineno": 81
},
"dropped_attributes": 0,
"timestamp": "2024-09-09T23:13:17.311909Z",
"observed_timestamp": "2024-09-09T23:13:17.311909Z",
"trace_id": "0xbda1d9efcd65435653d18fa37aef7dd3",
"span_id": "0xcd443e1917510385",
"trace_flags": 1,
"resource": {
"attributes": {
"telemetry.sdk.language": "python",
"telemetry.sdk.name": "opentelemetry",
"telemetry.sdk.version": "1.26.0",
"service.name": "TelemetryExample"
},
"schema_url": ""
}
}
```
In the output, you will find three spans: `Scenario's`, `Scenario: Chat Client`, and `chat.completions gpt-4o`, each representing a different layer in the sample. In particular, `chat.completions gpt-4o` is generated by the chat client. Inside it, you will find information about the call, such as the timestamp of the operation, the response id and the finish reason. You will also find sensitive information such as the prompt and response to and from the model (only if you have `AGENT_FRAMEWORK__GENAI_ENABLE_OTEL_DIAGNOSTICS_SENSITIVE` set to true). If you use Application Insights or Aspire Dashboard, these information will be available to you in an interactive UI.
Use the guides from OpenTelemetry to setup exporters for [the console](https://opentelemetry.io/docs/languages/python/getting-started/), or use [manual_setup_console_output](./manual_setup_console_output.py) as a reference, just know that there are a lot of options you can setup and this is not a comprehensive example.
@@ -1,190 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
# type: ignore
import asyncio
import logging
from random import randint
from typing import Annotated
from agent_framework import ChatAgent
from agent_framework.openai import OpenAIChatClient
from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import trace
from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.view import DropAggregation, View
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.semconv.attributes import service_attributes
from opentelemetry.trace import SpanKind, set_tracer_provider
from pydantic import Field
from pydantic_settings import BaseSettings
class TelemetrySampleSettings(BaseSettings):
"""Settings for the telemetry sample application.
Optional settings are:
- connection_string: str - The connection string for the Application Insights resource.
This value can be found in the Overview section when examining
your resource from the Azure portal.
(Env var CONNECTION_STRING)
- otlp_endpoint: str - The OTLP endpoint to send telemetry data to.
Depending on the exporter used, you may find this value in different places.
(Env var OTLP_ENDPOINT)
If no connection string or OTLP endpoint is provided, the telemetry data will be
exported to the console.
"""
connection_string: str | None = None
otlp_endpoint: str | None = None
# Load settings
settings = TelemetrySampleSettings()
# Create a resource to represent the service/sample
resource = Resource.create({service_attributes.SERVICE_NAME: "TelemetryExample"})
# Define the scenarios that can be run
SCENARIOS = ["ai_service", "kernel_function", "auto_function_invocation", "all"]
if settings.connection_string:
configure_azure_monitor(
connection_string=settings.connection_string, enable_live_metrics=True, logger_name="agent_framework"
)
def set_up_logging():
class LogFilter(logging.Filter):
"""A filter to not process records from several subpackages."""
# These are the namespaces that we want to exclude from logging for the purposes of this demo.
namespaces_to_exclude: list[str] = [
"httpx",
"openai",
]
def filter(self, record):
return not any([record.name.startswith(namespace) for namespace in self.namespaces_to_exclude])
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPLogExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleLogExporter())
# Create and set a global logger provider for the application.
logger_provider = LoggerProvider(resource=resource)
# Log processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
for log_exporter in exporters:
logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
# Sets the global default logger provider
set_logger_provider(logger_provider)
# Create a logging handler to write logging records, in OTLP format, to the exporter.
handler = LoggingHandler()
handler.addFilter(LogFilter())
# Attach the handler to the root logger. `getLogger()` with no arguments returns the root logger.
# Events from all child loggers will be processed by this handler.
logger = logging.getLogger()
logger.addHandler(handler)
# Set the logging level to INFO.
logger.setLevel(logging.INFO)
def set_up_tracing():
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPSpanExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleSpanExporter())
# Initialize a trace provider for the application. This is a factory for creating tracers.
tracer_provider = TracerProvider(resource=resource)
# Span processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
for exporter in exporters:
tracer_provider.add_span_processor(BatchSpanProcessor(exporter))
# Sets the global default tracer provider
set_tracer_provider(tracer_provider)
def set_up_metrics():
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPMetricExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleMetricExporter())
# Initialize a metric provider for the application. This is a factory for creating meters.
metric_readers = [
PeriodicExportingMetricReader(metric_exporter, export_interval_millis=5000) for metric_exporter in exporters
]
meter_provider = MeterProvider(
metric_readers=metric_readers,
resource=resource,
views=[
# Dropping all instrument names except for those starting with "agent_framework"
View(instrument_name="*", aggregation=DropAggregation()),
View(instrument_name="agent_framework*"),
],
)
# Sets the global default meter provider
set_meter_provider(meter_provider)
async def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
await asyncio.sleep(randint(0, 10) / 10.0) # Simulate a network call
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def main():
# Set up the providers
# This must be done before any other telemetry calls
set_up_logging()
set_up_tracing()
set_up_metrics()
tracer = trace.get_tracer("agent_framework")
with tracer.start_as_current_span("Scenario: Agent Chat", kind=SpanKind.CLIENT) as current_span:
print("Running scenario: Agent Chat")
print("Welcome to the chat, type 'exit' to quit.")
agent = ChatAgent(
chat_client=OpenAIChatClient(),
tools=get_weather,
name="WeatherAgent",
instructions="You are a weather assistant.",
)
thread = agent.get_new_thread()
message = input("User: ")
try:
while message.lower() != "exit":
print(f"{agent.display_name}: ", end="")
async for update in agent.run_stream(
message,
thread=thread,
):
if update.text:
print(update.text, end="")
message = input("\nUser: ")
except Exception as e:
current_span.record_exception(e)
print(f"\nError running interactive chat: {e}")
if __name__ == "__main__":
asyncio.run(main())
@@ -1,186 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
# type: ignore
import asyncio
import logging
from random import randint
from typing import Annotated
from agent_framework import ChatMessage, ChatResponse, ChatResponseUpdate
from agent_framework.openai import OpenAIChatClient
from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import trace
from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.view import DropAggregation, View
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.semconv.attributes import service_attributes
from opentelemetry.trace import SpanKind, set_tracer_provider
from pydantic import Field
from pydantic_settings import BaseSettings
class TelemetrySampleSettings(BaseSettings):
"""Settings for the telemetry sample application.
Optional settings are:
- connection_string: str - The connection string for the Application Insights resource.
This value can be found in the Overview section when examining
your resource from the Azure portal.
(Env var CONNECTION_STRING)
- otlp_endpoint: str - The OTLP endpoint to send telemetry data to.
Depending on the exporter used, you may find this value in different places.
(Env var OTLP_ENDPOINT)
If no connection string or OTLP endpoint is provided, the telemetry data will be
exported to the console.
"""
connection_string: str | None = None
otlp_endpoint: str | None = None
# Load settings
settings = TelemetrySampleSettings()
# Create a resource to represent the service/sample
resource = Resource.create({service_attributes.SERVICE_NAME: "TelemetryExample"})
# Define the scenarios that can be run
SCENARIOS = ["ai_service", "kernel_function", "auto_function_invocation", "all"]
if settings.connection_string:
configure_azure_monitor(
connection_string=settings.connection_string, enable_live_metrics=True, logger_name="agent_framework"
)
def set_up_logging():
class LogFilter(logging.Filter):
"""A filter to not process records from several subpackages."""
# These are the namespaces that we want to exclude from logging for the purposes of this demo.
namespaces_to_exclude: list[str] = [
"httpx",
"openai",
]
def filter(self, record):
return not any([record.name.startswith(namespace) for namespace in self.namespaces_to_exclude])
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPLogExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleLogExporter())
# Create and set a global logger provider for the application.
logger_provider = LoggerProvider(resource=resource)
# Log processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
for log_exporter in exporters:
logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
# Sets the global default logger provider
set_logger_provider(logger_provider)
# Create a logging handler to write logging records, in OTLP format, to the exporter.
handler = LoggingHandler()
handler.addFilter(LogFilter())
# Attach the handler to the root logger. `getLogger()` with no arguments returns the root logger.
# Events from all child loggers will be processed by this handler.
logger = logging.getLogger()
logger.addHandler(handler)
# Set the logging level to WARNING, this will not log detailed events to the logger.
logger.setLevel(logging.WARNING)
def set_up_tracing():
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPSpanExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleSpanExporter())
# Initialize a trace provider for the application. This is a factory for creating tracers.
tracer_provider = TracerProvider(resource=resource)
# Span processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
for exporter in exporters:
tracer_provider.add_span_processor(BatchSpanProcessor(exporter))
# Sets the global default tracer provider
set_tracer_provider(tracer_provider)
def set_up_metrics():
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPMetricExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleMetricExporter())
# Initialize a metric provider for the application. This is a factory for creating meters.
metric_readers = [
PeriodicExportingMetricReader(metric_exporter, export_interval_millis=5000) for metric_exporter in exporters
]
meter_provider = MeterProvider(
metric_readers=metric_readers,
resource=resource,
views=[
# Dropping all instrument names except for those starting with "agent_framework"
View(instrument_name="*", aggregation=DropAggregation()),
View(instrument_name="agent_framework*"),
],
)
# Sets the global default meter provider
set_meter_provider(meter_provider)
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def main():
# Set up the providers
# This must be done before any other telemetry calls
set_up_logging()
set_up_tracing()
set_up_metrics()
tracer = trace.get_tracer("agent_framework")
with tracer.start_as_current_span("Scenario: Interactive Chat", kind=SpanKind.CLIENT) as current_span:
print("Running scenario: Interactive Chat")
print("Welcome to the chat, type 'exit' to quit.")
client = OpenAIChatClient()
messages: list[ChatMessage] = []
message = input("User: ")
try:
while message.lower() != "exit":
messages.append(ChatMessage(role="user", text=message))
print("Assistant: ", end="")
updates: list[ChatResponseUpdate] = []
async for update in client.get_streaming_response(messages, tools=get_weather):
updates.append(update)
if update.text:
print(update.text, end="")
print("")
messages.extend(ChatResponse.from_chat_response_updates(updates).messages)
message = input("User: ")
except Exception as e:
current_span.record_exception(e)
print(f"\nError running interactive chat: {e}")
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,114 @@
# Copyright (c) Microsoft. All rights reserved.
# type: ignore
import asyncio
import logging
from random import randint
from typing import Annotated
from agent_framework.openai import OpenAIChatClient
from opentelemetry._logs import set_logger_provider
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.trace import set_tracer_provider
from pydantic import Field
"""
This sample shows how to manually set up OpenTelemetry to log to the console.
And this can also be used as a reference for more complex telemetry setups.
"""
resource = Resource.create({ResourceAttributes.SERVICE_NAME: "ManualSetup"})
def setup_console_telemetry():
# Create and set a global logger provider for the application.
logger_provider = LoggerProvider(resource=resource)
# Log processors are initialized with an exporter which is responsible
logger_provider.add_log_record_processor(BatchLogRecordProcessor(ConsoleLogExporter()))
# Sets the global default logger provider
set_logger_provider(logger_provider)
# Create a logging handler to write logging records, in OTLP format, to the exporter.
handler = LoggingHandler()
# Attach the handler to the root logger. `getLogger()` with no arguments returns the root logger.
# Events from all child loggers will be processed by this handler.
logger = logging.getLogger()
logger.addHandler(handler)
# Set the logging level to NOTSET to allow all records to be processed by the handler.
logger.setLevel(logging.NOTSET)
# Initialize a trace provider for the application. This is a factory for creating tracers.
tracer_provider = TracerProvider(resource=resource)
# Span processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
tracer_provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
# Sets the global default tracer provider
set_tracer_provider(tracer_provider)
# Initialize a metric provider for the application. This is a factory for creating meters.
meter_provider = MeterProvider(
metric_readers=[PeriodicExportingMetricReader(ConsoleMetricExporter(), export_interval_millis=5000)],
resource=resource,
)
# Sets the global default meter provider
set_meter_provider(meter_provider)
async def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
await asyncio.sleep(randint(0, 10) / 10.0) # Simulate a network call
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def run_chat_client() -> None:
"""Run an AI service.
This function runs an AI service and prints the output.
Telemetry will be collected for the service execution behind the scenes,
and the traces will be sent to the configured telemetry backend.
The telemetry will include information about the AI service execution.
Args:
stream: Whether to use streaming for the plugin
Remarks:
When function calling is outside the open telemetry loop
each of the call to the model is handled as a seperate span,
while when the open telemetry is put last, a single span
is shown, which might include one or more rounds of function calling.
So for the scenario below, you should see the following:
2 spans with gen_ai.operation.name=chat
The first has finish_reason "tool_calls"
The second has finish_reason "stop"
2 spans with gen_ai.operation.name=execute_tool
"""
client = OpenAIChatClient()
message = "What's the weather in Amsterdam and in Paris?"
print(f"User: {message}")
print("Assistant: ", end="")
async for chunk in client.get_streaming_response(message, tools=get_weather):
if str(chunk):
print(str(chunk), end="")
print("")
async def main():
"""Run the selected scenario(s)."""
setup_console_telemetry()
await run_chat_client()
if __name__ == "__main__":
asyncio.run(main())
@@ -1,249 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
# type: ignore
import argparse
import asyncio
import logging
from random import randint
from typing import Annotated, Literal
from agent_framework import ai_function
from agent_framework.openai import OpenAIChatClient
from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import trace
from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.view import DropAggregation, View
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.semconv.attributes import service_attributes
from opentelemetry.trace import SpanKind, set_tracer_provider
from opentelemetry.trace.span import format_trace_id
from pydantic import Field
from pydantic_settings import BaseSettings
class TelemetrySampleSettings(BaseSettings):
"""Settings for the telemetry sample application.
Optional settings are:
- connection_string: str - The connection string for the Application Insights resource.
This value can be found in the Overview section when examining
your resource from the Azure portal.
(Env var CONNECTION_STRING)
- otlp_endpoint: str - The OTLP endpoint to send telemetry data to.
Depending on the exporter used, you may find this value in different places.
(Env var OTLP_ENDPOINT)
If no connection string or OTLP endpoint is provided, the telemetry data will be
exported to the console.
"""
connection_string: str | None = None
otlp_endpoint: str | None = None
# Load settings
settings = TelemetrySampleSettings()
# Create a resource to represent the service/sample
resource = Resource.create({service_attributes.SERVICE_NAME: "TelemetryExample"})
# Define the scenarios that can be run
SCENARIOS = ["chat_client", "chat_client_stream", "ai_function", "all"]
if settings.connection_string:
configure_azure_monitor(
connection_string=settings.connection_string,
enable_live_metrics=True,
logger_name="agent_framework",
)
def set_up_logging():
class LogFilter(logging.Filter):
"""A filter to not process records from several subpackages."""
# These are the namespaces that we want to exclude from logging for the purposes of this demo.
namespaces_to_exclude: list[str] = [
"httpx",
"openai",
]
def filter(self, record):
return not any([record.name.startswith(namespace) for namespace in self.namespaces_to_exclude])
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPLogExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleLogExporter())
# Create and set a global logger provider for the application.
logger_provider = LoggerProvider(resource=resource)
# Log processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
for log_exporter in exporters:
logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
# Sets the global default logger provider
set_logger_provider(logger_provider)
# Create a logging handler to write logging records, in OTLP format, to the exporter.
handler = LoggingHandler()
handler.addFilter(LogFilter())
# Attach the handler to the root logger. `getLogger()` with no arguments returns the root logger.
# Events from all child loggers will be processed by this handler.
logger = logging.getLogger()
logger.addHandler(handler)
# Set the logging level to NOTSET to allow all records to be processed by the handler.
logger.setLevel(logging.NOTSET)
def set_up_tracing():
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPSpanExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleSpanExporter())
# Initialize a trace provider for the application. This is a factory for creating tracers.
tracer_provider = TracerProvider(resource=resource)
# Span processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
for exporter in exporters:
tracer_provider.add_span_processor(BatchSpanProcessor(exporter))
# Sets the global default tracer provider
set_tracer_provider(tracer_provider)
def set_up_metrics():
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPMetricExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleMetricExporter())
# Initialize a metric provider for the application. This is a factory for creating meters.
metric_readers = [
PeriodicExportingMetricReader(metric_exporter, export_interval_millis=5000) for metric_exporter in exporters
]
meter_provider = MeterProvider(
metric_readers=metric_readers,
resource=resource,
views=[
# Dropping all instrument names except for those starting with "agent_framework"
View(instrument_name="*", aggregation=DropAggregation()),
View(instrument_name="agent_framework*"),
],
)
# Sets the global default meter provider
set_meter_provider(meter_provider)
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def run_chat_client(stream: bool = False) -> None:
"""Run an AI service.
This function runs an AI service and prints the output.
Telemetry will be collected for the service execution behind the scenes,
and the traces will be sent to the configured telemetry backend.
The telemetry will include information about the AI service execution.
Args:
stream (bool): Whether to use streaming for the plugin
"""
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(
"Scenario: Chat Client Stream" if stream else "Scenario: Chat Client", kind=SpanKind.CLIENT
) as current_span:
print("Running scenario: Chat Client" if not stream else "Running scenario: Chat Client Stream")
try:
client = OpenAIChatClient()
message = "What's the weather in Amsterdam and in Paris?"
print(f"User: {message}")
if stream:
print("Assistant: ", end="")
async for chunk in client.get_streaming_response(message, tools=get_weather):
if str(chunk):
print(str(chunk), end="")
print("")
else:
response = await client.get_response(message, tools=get_weather)
print(f"Assistant: {response}")
except Exception as e:
current_span.record_exception(e)
print(f"Error running AI service: {e}")
async def run_ai_function() -> None:
"""Run a AI function.
This function runs a AI function and prints the output.
Telemetry will be collected for the function execution behind the scenes,
and the traces will be sent to the configured telemetry backend.
The telemetry will include information about the AI function execution
and the AI service execution.
"""
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("Scenario: AI Function", kind=SpanKind.CLIENT) as current_span:
print("Running scenario: AI Function")
try:
func = ai_function(get_weather)
weather = await func.invoke(location="Amsterdam")
print(f"Weather in Amsterdam:\n{weather}")
except Exception as e:
current_span.record_exception(e)
print(f"Error running kernel plugin: {e}")
async def main(scenario: Literal["chat_client", "chat_client_stream", "ai_function", "all"] = "all"):
# Set up the providers
# This must be done before any other telemetry calls
set_up_logging()
set_up_tracing()
set_up_metrics()
tracer = trace.get_tracer("agent_framework")
with tracer.start_as_current_span("Scenario's", kind=SpanKind.CLIENT) as current_span:
print(f"Trace ID: {format_trace_id(current_span.get_span_context().trace_id)}")
# Scenarios where telemetry is collected in the SDK, from the most basic to the most complex.
if scenario == "chat_client" or scenario == "all":
await run_chat_client(stream=False)
if scenario == "chat_client_stream" or scenario == "all":
await run_chat_client(stream=True)
if scenario == "ai_function" or scenario == "all":
await run_ai_function()
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument(
"--scenario",
type=str,
choices=SCENARIOS,
default="all",
help="The scenario to run. Default is all.",
)
args = arg_parser.parse_args()
asyncio.run(main(args.scenario))
@@ -1,253 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
# type: ignore
import asyncio
import logging
from typing import Any
from agent_framework.workflow import (
Executor,
WorkflowBuilder,
WorkflowCompletedEvent,
WorkflowContext,
handler,
)
from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import trace
from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.view import DropAggregation, View
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.semconv.attributes import service_attributes
from opentelemetry.trace import SpanKind, set_tracer_provider
from opentelemetry.trace.span import format_trace_id
from pydantic_settings import BaseSettings
"""Telemetry sample demonstrating OpenTelemetry integration with Agent Framework workflows.
This sample runs a simple sequential workflow with telemetry collection,
showing telemetry collection for workflow execution, executor processing,
and message publishing between executors.
"""
class TelemetrySampleSettings(BaseSettings):
"""Settings for the telemetry sample application.
Optional settings are:
- connection_string: str - The connection string for the Application Insights resource.
This value can be found in the Overview section when examining
your resource from the Azure portal.
(Env var CONNECTION_STRING)
- otlp_endpoint: str - The OTLP endpoint to send telemetry data to.
Depending on the exporter used, you may find this value in different places.
(Env var OTLP_ENDPOINT)
If no connection string or OTLP endpoint is provided, the telemetry data will be
exported to the console.
"""
connection_string: str | None = None
otlp_endpoint: str | None = None
# Load settings
settings = TelemetrySampleSettings()
# Create a resource to represent the service/sample
resource = Resource.create({service_attributes.SERVICE_NAME: "WorkflowTelemetryExample"})
if settings.connection_string:
configure_azure_monitor(
connection_string=settings.connection_string,
enable_live_metrics=True,
logger_name="agent_framework",
)
def set_up_logging():
class LogFilter(logging.Filter):
"""A filter to not process records from several subpackages."""
# These are the namespaces that we want to exclude from logging for the purposes of this demo.
namespaces_to_exclude: list[str] = [
"httpx",
"openai",
]
def filter(self, record):
return not any([record.name.startswith(namespace) for namespace in self.namespaces_to_exclude])
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPLogExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleLogExporter())
# Create and set a global logger provider for the application.
logger_provider = LoggerProvider(resource=resource)
# Log processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
for log_exporter in exporters:
logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
# Sets the global default logger provider
set_logger_provider(logger_provider)
# Create a logging handler to write logging records, in OTLP format, to the exporter.
handler = LoggingHandler()
handler.addFilter(LogFilter())
# Attach the handler to the root logger. `getLogger()` with no arguments returns the root logger.
# Events from all child loggers will be processed by this handler.
logger = logging.getLogger()
logger.addHandler(handler)
# Set the logging level to NOTSET to allow all records to be processed by the handler.
logger.setLevel(logging.NOTSET)
def set_up_tracing():
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPSpanExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleSpanExporter())
# Initialize a trace provider for the application. This is a factory for creating tracers.
tracer_provider = TracerProvider(resource=resource)
# Span processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
for exporter in exporters:
tracer_provider.add_span_processor(BatchSpanProcessor(exporter))
# Sets the global default tracer provider
set_tracer_provider(tracer_provider)
def set_up_metrics():
exporters = []
if settings.otlp_endpoint:
exporters.append(OTLPMetricExporter(endpoint=settings.otlp_endpoint))
if not exporters:
exporters.append(ConsoleMetricExporter())
# Initialize a metric provider for the application. This is a factory for creating meters.
metric_readers = [
PeriodicExportingMetricReader(metric_exporter, export_interval_millis=5000) for metric_exporter in exporters
]
meter_provider = MeterProvider(
metric_readers=metric_readers,
resource=resource,
views=[
# Dropping all instrument names except for those starting with "agent_framework"
View(instrument_name="*", aggregation=DropAggregation()),
View(instrument_name="agent_framework*"),
],
)
# Sets the global default meter provider
set_meter_provider(meter_provider)
# Executors for sequential workflow
class UpperCaseExecutor(Executor):
"""An executor that converts text to uppercase."""
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Execute the task by converting the input string to uppercase."""
print(f"UpperCaseExecutor: Processing '{text}'")
result = text.upper()
print(f"UpperCaseExecutor: Result '{result}'")
# Send the result to the next executor in the workflow.
await ctx.send_message(result)
class ReverseTextExecutor(Executor):
"""An executor that reverses text."""
@handler
async def reverse_text(self, text: str, ctx: WorkflowContext[Any]) -> None:
"""Execute the task by reversing the input string."""
print(f"ReverseTextExecutor: Processing '{text}'")
result = text[::-1]
print(f"ReverseTextExecutor: Result '{result}'")
# Send the result with a workflow completion event.
await ctx.add_event(WorkflowCompletedEvent(result))
async def run_sequential_workflow() -> None:
"""Run a simple sequential workflow demonstrating telemetry collection.
This workflow processes a string through two executors in sequence:
1. UpperCaseExecutor converts the input to uppercase
2. ReverseTextExecutor reverses the string and completes the workflow
Telemetry data collected includes:
- Overall workflow execution spans
- Individual executor processing spans
- Message publishing between executors
- Workflow completion events
"""
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("Scenario: Sequential Workflow", kind=SpanKind.CLIENT) as current_span:
print("Running scenario: Sequential Workflow")
try:
# Step 1: Create the executors.
upper_case_executor = UpperCaseExecutor(id="upper_case_executor")
reverse_text_executor = ReverseTextExecutor(id="reverse_text_executor")
# Step 2: Build the workflow with the defined edges.
workflow = (
WorkflowBuilder()
.add_edge(upper_case_executor, reverse_text_executor)
.set_start_executor(upper_case_executor)
.build()
)
# Step 3: Run the workflow with an initial message.
input_text = "hello world"
print(f"Starting workflow with input: '{input_text}'")
completion_event = None
async for event in workflow.run_stream(input_text):
print(f"Event: {event}")
if isinstance(event, WorkflowCompletedEvent):
# The WorkflowCompletedEvent contains the final result.
completion_event = event
if completion_event:
print(f"Workflow completed with result: '{completion_event.data}'")
else:
print("Workflow completed without a completion event")
except Exception as e:
current_span.record_exception(e)
print(f"Error running workflow: {e}")
async def main():
"""Run the telemetry sample with a simple sequential workflow."""
# Set up the providers
# This must be done before any other telemetry calls
set_up_logging()
set_up_tracing()
set_up_metrics()
tracer = trace.get_tracer("agent_framework")
with tracer.start_as_current_span("Sequential Workflow Scenario", kind=SpanKind.CLIENT) as current_span:
print(f"Trace ID: {format_trace_id(current_span.get_span_context().trace_id)}")
# Run the sequential workflow scenario
await run_sequential_workflow()
if __name__ == "__main__":
asyncio.run(main())
@@ -64,7 +64,7 @@ async def main():
# AgentRunUpdateEvent contains incremental text deltas from the underlying agent.
# Print a prefix when the executor changes, then append updates on the same line.
eid = event.executor_id
if eid != last_executor_id:
if eid != last_executor_id: # type: ignore[reportUnnecessaryComparison]
if last_executor_id is not None:
print()
print(f"{eid}:", end=" ", flush=True)