Merge branch 'main' into local-branch-fix-workflow-as-agent-pending-request-handling

This commit is contained in:
Tao Chen
2026-06-01 22:15:14 -07:00
Unverified
116 changed files with 9188 additions and 4649 deletions
+54
View File
@@ -0,0 +1,54 @@
# A2A Client Samples
These samples demonstrate how to **consume** remote A2A-compliant agents using the Agent Framework's `A2AAgent` class.
For hosting your own agents as A2A servers, see [`samples/04-hosting/a2a/`](../../04-hosting/a2a/).
## Samples
| Sample | Concept |
|--------|---------|
| [`agent_with_a2a.py`](agent_with_a2a.py) | Basic consumption — non-streaming and streaming |
| [`a2a_agent_as_function_tools.py`](a2a_agent_as_function_tools.py) | Expose A2A skills as function tools for a host agent |
| [`a2a_polling.py`](a2a_polling.py) | Poll a long-running task with continuation tokens |
| [`a2a_stream_reconnection.py`](a2a_stream_reconnection.py) | Resume an interrupted stream via continuation token |
| [`a2a_protocol_selection.py`](a2a_protocol_selection.py) | Configure preferred protocol bindings (JSONRPC, GRPC, HTTP+JSON) |
## Prerequisites
- A running A2A-compliant agent server (see `samples/04-hosting/a2a/` to start one)
- Set `A2A_AGENT_HOST` environment variable to the server URL
- For `a2a_agent_as_function_tools.py`: also set `FOUNDRY_PROJECT_ENDPOINT` and `FOUNDRY_MODEL`
## Running
```bash
cd python/samples/02-agents/a2a
# Start an A2A server in another terminal first:
# cd python/samples/04-hosting/a2a && uv run python a2a_server.py
export A2A_AGENT_HOST="http://localhost:5001/"
uv run python agent_with_a2a.py
```
## Key APIs
```python
from agent_framework.a2a import A2AAgent
# Connect to a remote agent
async with A2AAgent(url="http://localhost:5001/", agent_card=card) as agent:
# Non-streaming
response = await agent.run("Hello")
# Streaming
stream = agent.run("Hello", stream=True)
async for update in stream:
print(update.text)
# Background + polling
response = await agent.run("Long task", background=True)
while response.continuation_token:
response = await agent.poll_task(response.continuation_token)
```
@@ -33,7 +33,7 @@ Prerequisites:
- Set FOUNDRY_MODEL to the model deployment name (e.g. gpt-4o)
To run this sample:
cd python/samples/04-hosting/a2a
cd python/samples/02-agents/a2a
uv run python a2a_agent_as_function_tools.py
"""
@@ -0,0 +1,96 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
import httpx
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent
from dotenv import load_dotenv
load_dotenv()
"""
A2A Polling for Task Completion
This sample demonstrates how to poll a long-running A2A task for completion
using continuation tokens. When `background=True`, the agent returns immediately
with a continuation token that you can use to check progress later.
Key concepts demonstrated:
- Starting a background A2A task with `background=True`
- Receiving a continuation token for in-progress tasks
- Polling with `poll_task()` until the task reaches a terminal state
This is the A2A equivalent of the .NET A2AAgent_PollingForTaskCompletion sample.
Prerequisites:
- Set A2A_AGENT_HOST to the URL of a running A2A server
To run this sample:
cd python/samples/02-agents/a2a
uv run python a2a_polling.py
"""
async def main() -> None:
"""Demonstrates polling a long-running A2A task for completion."""
a2a_agent_host = os.getenv("A2A_AGENT_HOST")
if not a2a_agent_host:
raise ValueError("A2A_AGENT_HOST environment variable is not set")
# 1. Resolve agent card and create agent.
async with httpx.AsyncClient(timeout=60.0) as http_client:
resolver = A2ACardResolver(httpx_client=http_client, base_url=a2a_agent_host)
agent_card = await resolver.get_agent_card()
async with A2AAgent(
name=agent_card.name,
agent_card=agent_card,
url=a2a_agent_host,
) as agent:
# 2. Start a background task — the agent returns immediately.
print("Starting background task...")
response = await agent.run(
"Write a detailed research report on quantum computing advances in 2025",
background=True,
)
# 3. Check if we got a continuation token (task still in progress).
if response.continuation_token is None:
# Task completed immediately — no polling needed.
print("Task completed immediately:")
print(f" {response.text}")
return
# 4. Poll until the task completes.
token = response.continuation_token
poll_count = 0
while token is not None:
poll_count += 1
print(f" Poll #{poll_count} — task still in progress, waiting 2s...")
await asyncio.sleep(2)
response = await agent.poll_task(token) # type: ignore[arg-type]
token = response.continuation_token
# 5. Task is done — print the final response.
print(f"\nTask completed after {poll_count} poll(s):")
print(f" {response.text[:200]}...")
if __name__ == "__main__":
asyncio.run(main())
"""
Sample output:
Starting background task...
Poll #1 — task still in progress, waiting 2s...
Poll #2 — task still in progress, waiting 2s...
Poll #3 — task still in progress, waiting 2s...
Task completed after 3 poll(s):
Quantum computing has seen remarkable progress in 2025, with breakthroughs in...
"""
@@ -0,0 +1,84 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
import httpx
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent
from dotenv import load_dotenv
load_dotenv()
"""
A2A Protocol Selection
This sample demonstrates how to configure which protocol binding the A2A client
uses when connecting to a remote agent. The A2A specification defines three
standard bindings: JSONRPC, GRPC, and HTTP+JSON. Agents declare their supported
bindings in their AgentCard, and clients can express a preference.
Key concepts demonstrated:
- Configuring `supported_protocol_bindings` on A2AAgent
- The client selects a binding that matches the remote agent's capabilities
- Fallback behavior when preferred binding is unavailable
This is the A2A equivalent of the .NET A2AAgent_ProtocolSelection sample.
Prerequisites:
- Set A2A_AGENT_HOST to the URL of a running A2A server
To run this sample:
cd python/samples/02-agents/a2a
uv run python a2a_protocol_selection.py
"""
async def main() -> None:
"""Demonstrates configuring A2A protocol binding preferences."""
a2a_agent_host = os.getenv("A2A_AGENT_HOST")
if not a2a_agent_host:
raise ValueError("A2A_AGENT_HOST environment variable is not set")
# 1. Resolve agent card to see what bindings are available.
async with httpx.AsyncClient(timeout=60.0) as http_client:
resolver = A2ACardResolver(httpx_client=http_client, base_url=a2a_agent_host)
agent_card = await resolver.get_agent_card()
print(f"Agent: {agent_card.name}")
print("Supported interfaces:")
for interface in agent_card.supported_interfaces:
print(f" - {interface.protocol_binding} @ {interface.url}")
# 2. Create agent with explicit protocol binding preference.
# The list is ordered by preference — the SDK will select the first
# binding that matches a supported interface on the agent card.
#
# This matters when a server exposes multiple interfaces (e.g. JSONRPC
# on / and HTTP+JSON on /api/). If only one binding is available, the
# client uses it regardless of your preference list.
async with A2AAgent(
name=agent_card.name,
agent_card=agent_card,
url=a2a_agent_host,
supported_protocol_bindings=["HTTP+JSON", "JSONRPC"],
) as agent:
print("\nConfigured bindings: ['HTTP+JSON', 'JSONRPC']")
response = await agent.run("Tell me a short joke")
print(f"Response: {response.text}")
if __name__ == "__main__":
asyncio.run(main())
"""
Sample output:
Agent: PolicyAgent
Supported interfaces:
- JSONRPC @ http://localhost:5001/
Configured bindings: ['HTTP+JSON', 'JSONRPC']
Response: Here's a short joke for you...
"""
@@ -0,0 +1,124 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
import httpx
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent
from dotenv import load_dotenv
load_dotenv()
"""
A2A Stream Reconnection
This sample demonstrates how to reconnect to an interrupted A2A stream
using a continuation token. When streaming a long-running task, you can
capture the continuation token from any update and use it to resume the
stream later if the connection is lost.
Key concepts demonstrated:
- Streaming an A2A response with `stream=True`
- Capturing continuation tokens from in-progress updates
- Simulating a stream interruption (break)
- Resuming the stream with `run(continuation_token=..., stream=True)`
This is the A2A equivalent of the .NET A2AAgent_StreamReconnection sample.
Prerequisites:
- Set A2A_AGENT_HOST to the URL of a running A2A server
To run this sample:
cd python/samples/02-agents/a2a
uv run python a2a_stream_reconnection.py
"""
async def main() -> None:
"""Demonstrates reconnecting to an interrupted A2A stream."""
a2a_agent_host = os.getenv("A2A_AGENT_HOST")
if not a2a_agent_host:
raise ValueError("A2A_AGENT_HOST environment variable is not set")
# 1. Resolve agent card and create agent.
async with httpx.AsyncClient(timeout=60.0) as http_client:
resolver = A2ACardResolver(httpx_client=http_client, base_url=a2a_agent_host)
agent_card = await resolver.get_agent_card()
async with A2AAgent(
name=agent_card.name,
agent_card=agent_card,
url=a2a_agent_host,
) as agent:
# 2. Start a streaming background task.
print("Starting streaming task...")
stream = agent.run(
"Write a long essay about the history of artificial intelligence",
stream=True,
background=True,
)
# 3. Read a few updates, capture the continuation token, then "disconnect".
saved_token = None
update_count = 0
async for update in stream:
update_count += 1
if update.continuation_token:
saved_token = update.continuation_token
for content in update.contents:
if content.text:
print(content.text, end="", flush=True)
# Simulate a disconnect after receiving 3 updates.
if update_count >= 3:
print("\n\n--- Connection interrupted! ---\n")
break
if saved_token is None:
print("No continuation token received — task may have completed before interruption.")
return
# 4. Reconnect using the saved continuation token.
# background=True is required so that in-progress task updates
# surface continuation tokens (matching the A2AAgent contract).
print(f"Reconnecting with continuation token (task_id={saved_token['task_id']})...")
resumed_stream = agent.run(
continuation_token=saved_token,
stream=True,
background=True,
)
# 5. Continue receiving updates from where we left off.
async for update in resumed_stream:
update_count += 1
for content in update.contents:
if content.text:
print(content.text, end="", flush=True)
print() # newline after streaming completes
response = await resumed_stream.get_final_response()
print(f"\nStream completed. Total updates: {update_count}")
print(f"Final response: {len(response.messages)} message(s)")
if __name__ == "__main__":
asyncio.run(main())
"""
Sample output:
Starting streaming task...
Policy:
--- Connection interrupted! ---
Reconnecting with continuation token (task_id=task-abc123)...
Short Shipment Dispute Handling Policy V2.1
Summary: "For short shipments reported by customers, first verify internal..."
Stream completed. Total updates: 106
Final response: 103 message(s)
"""
@@ -22,7 +22,7 @@ technologies to communicate seamlessly.
By default the A2AAgent waits for the remote agent to finish before returning (background=False).
This means long-running A2A tasks are handled transparently the caller simply awaits the result.
For advanced scenarios where you need to poll or resubscribe to in-progress tasks, see the
background_responses sample: samples/concepts/background_responses.py
a2a_polling and a2a_stream_reconnection samples in this folder.
For more information about the A2A protocol specification, visit: https://a2a-protocol.org/latest/
@@ -70,9 +70,7 @@ async def main():
print("\n--- Non-streaming response ---")
response = await agent.run("What are your capabilities?")
print("Agent Response:")
for message in response.messages:
print(f" {message.text}")
print(f"Agent Response:\n {response.text}")
# 5. Stream a response — the natural model for A2A.
# Updates arrive as Server-Sent Events, letting you observe
@@ -82,12 +80,11 @@ async def main():
async for update in stream:
for content in update.contents:
if content.text:
print(f" {content.text}")
print(content.text, end="", flush=True)
print() # newline after streaming completes
response = await stream.get_final_response()
print(f"\nFinal response ({len(response.messages)} message(s)):")
for message in response.messages:
print(f" {message.text}")
print(f"\nFinal response:\n {response.text}")
if __name__ == "__main__":
@@ -105,8 +102,8 @@ Agent Response:
I can help with code generation, analysis, and general Q&A.
--- Streaming response ---
I am an AI assistant built to help with various tasks.
I am an AI assistant built to help with various tasks.
Final response (1 message(s)):
Final response:
I am an AI assistant built to help with various tasks.
"""
@@ -109,7 +109,10 @@ async def main() -> None:
print(f"\n [calling tool: {content.name}]", flush=True)
print(" ", end="", flush=True)
# Show web search activity when the result arrives with action details.
elif content.type in ("search_tool_call", "search_tool_result") and getattr(content, "tool_name", None) == "web_search":
elif (
content.type in ("search_tool_call", "search_tool_result")
and getattr(content, "tool_name", None) == "web_search"
):
action = None
if content.type == "search_tool_result" and isinstance(content.result, dict):
action = content.result.get("action", {})
+13 -35
View File
@@ -1,39 +1,30 @@
# A2A Agent Examples
# A2A Server Hosting Examples
This sample demonstrates how to host and consume agents using the [A2A (Agent2Agent) protocol](https://a2a-protocol.org/latest/) with the `agent_framework` package. There are three runnable entry points:
This sample demonstrates how to **host** Agent Framework agents as A2A-compliant servers using the [A2A (Agent2Agent) protocol](https://a2a-protocol.org/latest/).
> **Looking for client samples?** See [`samples/02-agents/a2a/`](../../02-agents/a2a/) for consuming remote A2A agents.
## Server Samples
| Run this file | To... |
|---------------|-------|
| **[`a2a_server.py`](a2a_server.py)** | Host an Agent Framework agent as an A2A-compliant server. |
| **[`agent_with_a2a.py`](agent_with_a2a.py)** | Connect to an A2A server and send requests (non-streaming and streaming). |
| **[`a2a_agent_as_function_tools.py`](a2a_agent_as_function_tools.py)** | Convert A2A agent skills into function tools for a host agent. |
| **[`a2a_server.py`](a2a_server.py)** | Host an Agent Framework agent as an A2A-compliant server (multi-agent). |
| **[`agent_framework_to_a2a.py`](agent_framework_to_a2a.py)** | Minimal example: expose a single agent as an A2A server. |
The remaining files are supporting modules used by the server:
## Supporting Modules
| File | Description |
|------|-------------|
| [`agent_framework_to_a2a.py`](agent_framework_to_a2a.py) | Exposes an agent_framework agent as an A2A-compliant server. Demonstrates how to wrap an agent_framework agent and expose it as an A2A service that other A2A clients can discover and communicate with. |
| [`agent_definitions.py`](agent_definitions.py) | Agent and AgentCard factory definitions for invoice, policy, and logistics agents. |
| [`agent_executor.py`](agent_executor.py) | Bridges the a2a-sdk `AgentExecutor` interface to Agent Framework agents. |
| [`invoice_data.py`](invoice_data.py) | Mock invoice data and tool functions for the invoice agent. |
| [`a2a_server.http`](a2a_server.http) | REST Client requests for testing the server directly from VS Code. |
## Environment Variables
Make sure to set the following environment variables before running the examples:
### Required (Server)
- `FOUNDRY_PROJECT_ENDPOINT` — Your Azure AI Foundry project endpoint
- `FOUNDRY_MODEL` — Model deployment name (e.g. `gpt-4o`)
### Required (Client)
- `A2A_AGENT_HOST` — URL of the A2A server (e.g. `http://localhost:5001/`)
### Required (Function Tools Sample)
- `A2A_AGENT_HOST` — URL of the A2A server (e.g. `http://localhost:5000/`)
- `FOUNDRY_PROJECT_ENDPOINT` — Your Azure AI Foundry project endpoint
- `FOUNDRY_MODEL` — Model deployment name (e.g. `gpt-4o`)
## Quick Start
All commands below should be run from this directory:
@@ -67,7 +58,7 @@ uv run python a2a_server.py --agent-type policy
### 1. Start the A2A Server
> **Note (Option A — pip users):** Replace `uv run python` with `python` in all `uv run` commands below (e.g. `python a2a_server.py ...`). `uv` is not required once the virtual environment is activated.
> **Note (Option A — pip users):** Replace `uv run python` with `python` in all `uv run` commands below. `uv` is not required once the virtual environment is activated.
Pick an agent type and start the server (each in its own terminal):
@@ -79,25 +70,12 @@ uv run python a2a_server.py --agent-type logistics --port 5002
You can run one agent or all three — each listens on its own port.
### 2. Run the A2A Client
### 2. Run a Client
In a separate terminal (from the same directory), point the client at a running server:
Once a server is running, use any of the client samples in [`samples/02-agents/a2a/`](../../02-agents/a2a/):
```powershell
cd python/samples/02-agents/a2a
$env:A2A_AGENT_HOST = "http://localhost:5001/"
uv run python agent_with_a2a.py
# A2A server exposing an agent_framework agent
uv run python agent_framework_to_a2a.py
```
### 3. Run the Function Tools Sample
This sample resolves the remote agent's skills and registers each one as a function tool
on a host Foundry-backed agent. The host agent then autonomously selects the right skill
to handle the user's request.
```powershell
$env:A2A_AGENT_HOST = "http://localhost:5000/"
uv run python a2a_agent_as_function_tools.py
```
+2 -2
View File
@@ -9,7 +9,7 @@ from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.routes import create_agent_card_routes, create_jsonrpc_routes
from a2a.server.tasks import InMemoryTaskStore
from agent_definitions import AGENT_CARD_FACTORIES, AGENT_FACTORIES
from agent_executor import AgentFrameworkExecutor
from agent_framework.a2a import A2AExecutor
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
@@ -92,7 +92,7 @@ def main() -> None:
# Build the A2A server components
url = f"http://{args.host}:{args.port}/"
agent_card = AGENT_CARD_FACTORIES[args.agent_type](url)
executor = AgentFrameworkExecutor(agent)
executor = A2AExecutor(agent, stream=True)
task_store = InMemoryTaskStore()
request_handler = DefaultRequestHandler(
agent_executor=executor,
@@ -1,83 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
"""AgentExecutor bridge between the a2a-sdk server and Agent Framework agents.
Implements the a2a-sdk ``AgentExecutor`` interface so that incoming A2A
requests are forwarded to an Agent Framework agent and the response is
published back through the a2a-sdk event queue.
"""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from a2a.helpers import new_task_from_user_message
from a2a.server.agent_execution.agent_executor import AgentExecutor
from a2a.server.tasks import TaskUpdater
from a2a.types import Part, TaskState
if TYPE_CHECKING:
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from agent_framework import Agent
class AgentFrameworkExecutor(AgentExecutor):
"""Bridges A2A protocol requests to an Agent Framework agent.
For each incoming ``execute`` call the executor:
1. Extracts the user's text from the A2A ``RequestContext``.
2. Runs the Agent Framework agent (non-streaming).
3. Publishes the result as an A2A ``Message`` to the ``EventQueue``.
"""
def __init__(self, agent: Agent) -> None:
self.agent = agent
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Run the agent and publish the response."""
user_text = context.get_user_input()
if not user_text:
user_text = "Hello"
# v1.0 requires a Task object in the queue before any TaskStatusUpdateEvent
task = context.current_task
if not task and context.message:
task = new_task_from_user_message(context.message)
await event_queue.enqueue_event(task)
task_id = task.id if task else context.task_id
updater = TaskUpdater(event_queue, task_id, context.context_id)
# Signal that the agent is working
await updater.start_work()
try:
response = await self.agent.run(user_text)
# Build response text from agent messages
response_parts: list[Part] = []
for msg in response.messages:
if msg.text:
response_parts.append(Part(text=msg.text))
if not response_parts:
response_parts.append(Part(text=str(response)))
# Publish the agent's response and mark as completed
await updater.complete(
message=updater.new_agent_message(response_parts),
)
except asyncio.CancelledError:
raise
except Exception as e:
await updater.update_status(
state=TaskState.TASK_STATE_FAILED,
message=updater.new_agent_message([Part(text=f"Agent error: {e}")]),
)
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Handle cancellation by publishing a canceled status."""
updater = TaskUpdater(event_queue, context.task_id, context.context_id)
await updater.update_status(state=TaskState.TASK_STATE_CANCELED)
@@ -1,3 +1,12 @@
FOUNDRY_PROJECT_ENDPOINT="<your-project-endpoint>"
FOUNDRY_MODEL="<your-model-deployment>"
# Only needed for evaluate_with_rubric_sample.py — connects to the
# pre-existing Foundry agent that the rubric evaluator was created against.
FOUNDRY_AGENT_NAME="<your-agent-name>"
FOUNDRY_AGENT_VERSION="<your-agent-version>"
# Only needed for evaluate_with_rubric_sample.py — references a rubric
# evaluator you created in Foundry. Pin the version for reproducible runs.
FOUNDRY_RUBRIC_NAME="<your-rubric-name>"
FOUNDRY_RUBRIC_VERSION="<your-rubric-version>"
@@ -35,6 +35,34 @@ Evaluate what already happened — zero changes to agent code:
uv run samples/05-end-to-end/evaluation/foundry_evals/evaluate_traces_sample.py
```
### Referencing a rubric evaluator created in Foundry
Foundry users can create rubric evaluators in the Foundry portal (or
through the dedicated SDK / REST surface). Once an evaluator exists,
agent-framework consumes it like any other evaluator: pass a
`GeneratedEvaluatorRef(name=..., version=...)` in the `evaluators=`
list and pin the version for reproducible runs.
```python
from agent_framework.foundry import FoundryEvals, GeneratedEvaluatorRef
evals = FoundryEvals(
evaluators=[
GeneratedEvaluatorRef(name="reservation-policy-rubric", version="3"),
"relevance",
"coherence",
],
)
```
Quality gates on rubric output use the standard `EvalResults` helpers,
including `assert_dimension_score_at_least(...)` for per-dimension
thresholds.
See [`evaluate_with_rubric_sample.py`](./evaluate_with_rubric_sample.py)
for a runnable end-to-end example that combines a rubric evaluator with
built-in evaluators and gates a per-dimension threshold.
## Setup
Create a `.env` file with configuration as in the `.env.example` file in this folder.
@@ -44,3 +72,4 @@ Create a `.env` file with configuration as in the `.env.example` file in this fo
- **"I want to test my agent during development"** → `evaluate_agent_sample.py`, Pattern 1
- **"I want to evaluate past agent runs"** → `evaluate_traces_sample.py`
- **"I want to inspect/modify eval data before submitting"** → `evaluate_agent_sample.py`, Pattern 2
- **"I want to score against a custom rubric I created in Foundry"** → `evaluate_with_rubric_sample.py`
@@ -0,0 +1,138 @@
# Copyright (c) Microsoft. All rights reserved.
"""Evaluate a Foundry agent against a rubric evaluator that was created in Foundry.
Rubric evaluators are LLM-as-judge evaluators with custom scoring dimensions
that you define for your domain. agent-framework consumes pre-existing rubric
evaluators — they are authored in the Foundry portal (or via the dedicated
SDK / REST surface) and referenced here by name and version.
See: https://learn.microsoft.com/azure/ai-foundry/concepts/evaluation-evaluators/rubric-evaluators
This sample demonstrates:
1. Connecting to a pre-existing Foundry agent (PromptAgent or HostedAgent).
2. Referencing a pre-existing rubric evaluator by ``name`` and ``version``.
3. Mixing the rubric with built-in Foundry evaluators in one run.
4. Asserting per-dimension thresholds with
``EvalResults.assert_dimension_score_at_least(...)`` for CI quality gates.
Starting condition / prerequisites:
- An Azure AI Foundry project with a deployed model.
- A registered Foundry agent (PromptAgent or HostedAgent) in that project.
This is the agent the rubric is meant to evaluate.
- A rubric evaluator already created in the Foundry portal against that
agent. Creating rubrics through the portal currently requires picking a
Foundry agent as the generation context, so this prerequisite is implied
by having a rubric at all.
- Set the following in .env (see ``.env.example``):
- ``FOUNDRY_PROJECT_ENDPOINT``
- ``FOUNDRY_AGENT_NAME`` and ``FOUNDRY_AGENT_VERSION`` for the agent
- ``FOUNDRY_RUBRIC_NAME`` and ``FOUNDRY_RUBRIC_VERSION`` for the rubric
- ``FOUNDRY_MODEL`` for the rubric judge model
"""
import asyncio
import os
from agent_framework import EvalNotPassedError, evaluate_agent
from agent_framework.foundry import FoundryAgent, FoundryChatClient, FoundryEvals, GeneratedEvaluatorRef
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
load_dotenv(override=True)
async def main() -> None:
# 1. Connect to the existing Foundry agent that the rubric was created
# against. PromptAgents and HostedAgents are both supported.
credential = AzureCliCredential()
project_endpoint = os.environ["FOUNDRY_PROJECT_ENDPOINT"]
agent = FoundryAgent(
project_endpoint=project_endpoint,
agent_name=os.environ["FOUNDRY_AGENT_NAME"],
agent_version=os.environ.get("FOUNDRY_AGENT_VERSION"),
credential=credential,
)
# 2. Reference the pre-existing rubric evaluator by name + version.
# Always pin a version for reproducible CI runs; versionless refs
# resolve to "latest" and emit a warning at evaluation time.
rubric_name = os.environ["FOUNDRY_RUBRIC_NAME"]
rubric_version = os.environ["FOUNDRY_RUBRIC_VERSION"]
rubric = GeneratedEvaluatorRef(name=rubric_name, version=rubric_version)
# 3. Mix the rubric with built-in evaluators in a single FoundryEvals
# config. FoundryEvals talks to Foundry over the project endpoint, so
# we hand it a FoundryChatClient configured with the same credential.
eval_client = FoundryChatClient(
project_endpoint=project_endpoint,
model=os.environ["FOUNDRY_MODEL"],
credential=credential,
)
evals = FoundryEvals(
client=eval_client,
evaluators=[
rubric,
FoundryEvals.RELEVANCE,
FoundryEvals.COHERENCE,
],
)
# =========================================================================
# Run evaluation
# =========================================================================
print("=" * 60)
print(f"Evaluating '{agent.name}' with rubric '{rubric_name}' (version {rubric_version})")
print("=" * 60)
results = await evaluate_agent(
agent=agent,
queries=[
"What's the weather like in Seattle?",
"Should I bring an umbrella to London tomorrow?",
],
evaluators=evals,
)
for r in results:
print(f"Status: {r.status}")
print(f"Results: {r.passed}/{r.total} passed")
print(f"Portal: {r.report_url}")
if r.all_passed:
print("[PASS] All passed")
else:
print(f"[FAIL] {r.failed} failed")
# =========================================================================
# Per-dimension quality gate
# =========================================================================
# Rubric evaluators emit per-dimension scores (15) on top of the overall
# weighted score. Use assert_dimension_score_at_least to gate CI on a
# specific dimension — e.g., never ship if a critical dimension drops
# below 3.
#
# The dimension_id must match an id defined on your rubric in Foundry.
# ``general_quality`` is used here because it's the conventional
# ``always_applicable: true`` dimension in the Foundry docs' example
# rubric — swap it for whatever dimension id(s) your rubric actually
# defines.
print()
print("=" * 60)
print("Per-dimension quality gate")
print("=" * 60)
for r in results:
try:
r.assert_dimension_score_at_least(
"general_quality",
min_score=3.0,
evaluator=rubric_name,
)
print(f"[PASS] {r.provider}: general_quality >= 3 on every item")
except EvalNotPassedError as exc:
print(f"[FAIL] {r.provider}: dimension gate tripped: {exc}")
if __name__ == "__main__":
asyncio.run(main())
@@ -74,19 +74,22 @@ async def run_agent_framework() -> None:
client = OpenAIChatClient(model="gpt-4.1-mini")
# Create specialized agents
python_expert = Agent(client=client,
python_expert = Agent(
client=client,
name="python_expert",
instructions="You are a Python programming expert. Answer Python-related questions.",
description="Expert in Python programming",
)
javascript_expert = Agent(client=client,
javascript_expert = Agent(
client=client,
name="javascript_expert",
instructions="You are a JavaScript programming expert. Answer JavaScript-related questions.",
description="Expert in JavaScript programming",
)
database_expert = Agent(client=client,
database_expert = Agent(
client=client,
name="database_expert",
instructions="You are a database expert. Answer SQL and database-related questions.",
description="Expert in databases and SQL",
@@ -95,7 +98,8 @@ async def run_agent_framework() -> None:
workflow = GroupChatBuilder(
participants=[python_expert, javascript_expert, database_expert],
max_rounds=1,
orchestrator_agent=Agent(client=client,
orchestrator_agent=Agent(
client=client,
name="selector_manager",
instructions="Based on the conversation, select the most appropriate expert to respond next.",
),
@@ -113,7 +113,8 @@ async def run_agent_framework() -> None:
client = OpenAIChatClient(model="gpt-4.1-mini")
# Create triage agent
triage_agent = Agent(client=client,
triage_agent = Agent(
client=client,
name="triage",
instructions=(
"You are a triage agent. Analyze the user's request and route to the appropriate specialist:\n"
@@ -125,7 +126,8 @@ async def run_agent_framework() -> None:
)
# Create billing specialist
billing_agent = Agent(client=client,
billing_agent = Agent(
client=client,
name="billing_agent",
instructions="You are a billing specialist. Help with payment and billing questions. Provide clear assistance.",
description="Handles billing and payment questions",
@@ -133,7 +135,8 @@ async def run_agent_framework() -> None:
)
# Create technical support specialist
tech_support = Agent(client=client,
tech_support = Agent(
client=client,
name="technical_support",
instructions="You are technical support. Help with technical issues. Provide clear assistance.",
description="Handles technical support questions",
@@ -73,7 +73,8 @@ async def run_agent_framework() -> None:
# Create agent with tool
client = OpenAIChatClient(model="gpt-4.1-mini")
agent = Agent(client=client,
agent = Agent(
client=client,
name="assistant",
instructions="You are a helpful assistant. Use available tools to answer questions.",
tools=[get_weather],
@@ -61,7 +61,8 @@ async def run_agent_framework() -> None:
client = OpenAIChatClient(model="gpt-4.1-mini")
# Create specialized writer agent
writer = Agent(client=client,
writer = Agent(
client=client,
name="writer",
instructions="You are a creative writer. Write short, engaging content.",
)
@@ -75,7 +76,8 @@ async def run_agent_framework() -> None:
)
# Create coordinator agent with writer tool
coordinator = Agent(client=client,
coordinator = Agent(
client=client,
name="coordinator",
instructions="You coordinate with specialized agents. Delegate writing tasks to the writer agent.",
tools=[writer_tool],
@@ -78,12 +78,14 @@ async def sk_agent_response_callback(
async def run_agent_framework_example(prompt: str) -> list[Message]:
client = OpenAIChatCompletionClient(credential=AzureCliCredential())
writer = Agent(client=client,
writer = Agent(
client=client,
instructions=("You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."),
name="writer",
)
reviewer = Agent(client=client,
reviewer = Agent(
client=client,
instructions=("You are a thoughtful reviewer. Give brief feedback on the previous assistant message."),
name="reviewer",
)