From 0aa8d30d7f5d94606c65f78ba3f35da9d95f3801 Mon Sep 17 00:00:00 2001 From: Laveesh Rohra Date: Fri, 7 Nov 2025 10:37:03 -0800 Subject: [PATCH] Python: Add more samples for Azure Functions (#1980) * Move all samples * fix comments * remove dead lines * Make samples simpler --- .../azure_functions/01_single_agent/README.md | 16 +- .../azure_functions/01_single_agent/demo.http | 2 +- .../01_single_agent/function_app.py | 65 +-- ...ings.json => local.settings.json.template} | 0 .../azure_functions/02_multi_agent/README.md | 146 +++++++ .../azure_functions/02_multi_agent/demo.http | 58 +++ .../02_multi_agent/function_app.py | 99 +++++ .../azure_functions/02_multi_agent/host.json | 15 + .../local.settings.json.template | 10 + .../02_multi_agent/requirements.txt | 2 + .../azure_functions/03_callbacks/README.md | 109 +++++ .../azure_functions/03_callbacks/demo.http | 30 ++ .../03_callbacks/function_app.py | 178 ++++++++ .../azure_functions/03_callbacks/host.json | 15 + .../03_callbacks/local.settings.json.template | 10 + .../03_callbacks/requirements.txt | 2 + .../README.md | 56 +++ .../demo.http | 9 + .../function_app.py | 174 ++++++++ .../host.json | 23 + .../local.settings.json.template | 10 + .../requirements.txt | 2 + .../README.md | 61 +++ .../demo.http | 11 + .../function_app.py | 192 +++++++++ .../host.json | 23 + .../local.settings.json.template | 10 + .../requirements.txt | 2 + .../README.md | 27 ++ .../demo.http | 24 ++ .../function_app.py | 273 ++++++++++++ .../host.json | 23 + .../local.settings.json.template | 10 + .../requirements.txt | 2 + .../README.md | 33 ++ .../demo.http | 45 ++ .../function_app.py | 399 ++++++++++++++++++ .../host.json | 23 + .../local.settings.json.template | 10 + .../requirements.txt | 2 + 40 files changed, 2155 insertions(+), 46 deletions(-) rename python/samples/getting_started/azure_functions/01_single_agent/{local.settings.json => local.settings.json.template} (100%) create mode 100644 python/samples/getting_started/azure_functions/02_multi_agent/README.md create mode 100644 python/samples/getting_started/azure_functions/02_multi_agent/demo.http create mode 100644 python/samples/getting_started/azure_functions/02_multi_agent/function_app.py create mode 100644 python/samples/getting_started/azure_functions/02_multi_agent/host.json create mode 100644 python/samples/getting_started/azure_functions/02_multi_agent/local.settings.json.template create mode 100644 python/samples/getting_started/azure_functions/02_multi_agent/requirements.txt create mode 100644 python/samples/getting_started/azure_functions/03_callbacks/README.md create mode 100644 python/samples/getting_started/azure_functions/03_callbacks/demo.http create mode 100644 python/samples/getting_started/azure_functions/03_callbacks/function_app.py create mode 100644 python/samples/getting_started/azure_functions/03_callbacks/host.json create mode 100644 python/samples/getting_started/azure_functions/03_callbacks/local.settings.json.template create mode 100644 python/samples/getting_started/azure_functions/03_callbacks/requirements.txt create mode 100644 python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/README.md create mode 100644 python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/demo.http create mode 100644 python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/function_app.py create mode 100644 python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/host.json create mode 100644 python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/local.settings.json.template create mode 100644 python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/requirements.txt create mode 100644 python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/README.md create mode 100644 python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/demo.http create mode 100644 python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py create mode 100644 python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/host.json create mode 100644 python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/local.settings.json.template create mode 100644 python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/requirements.txt create mode 100644 python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/README.md create mode 100644 python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/demo.http create mode 100644 python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/function_app.py create mode 100644 python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/host.json create mode 100644 python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/local.settings.json.template create mode 100644 python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/requirements.txt create mode 100644 python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/README.md create mode 100644 python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/demo.http create mode 100644 python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py create mode 100644 python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/host.json create mode 100644 python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/local.settings.json.template create mode 100644 python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/requirements.txt diff --git a/python/samples/getting_started/azure_functions/01_single_agent/README.md b/python/samples/getting_started/azure_functions/01_single_agent/README.md index cb4b055c5d..4163c29ac3 100644 --- a/python/samples/getting_started/azure_functions/01_single_agent/README.md +++ b/python/samples/getting_started/azure_functions/01_single_agent/README.md @@ -32,7 +32,7 @@ source .venv/bin/activate - Azurite storage emulator – the sample uses `AzureWebJobsStorage=UseDevelopmentStorage=true`; start Azurite before launching the app. - Durable Task local backend – `DURABLE_TASK_SCHEDULER_CONNECTION_STRING` expects the Durable Task scheduler listening on `http://localhost:8080` (start the Durable Functions emulator if it is not already running). - Python dependencies – from this folder, run `pip install -r requirements.txt` (or the equivalent in your active virtual environment). -- Environment variables – update `AZURE_OPENAI_ENDPOINT` and `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME` in `local.settings.json` with your Azure OpenAI resource details; keep the other values as provided unless you are using custom infrastructure. +- Copy `local.settings.json.template` to `local.settings.json` and update the values for `AZURE_OPENAI_ENDPOINT` and `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME` (and optionally `AZURE_OPENAI_API_KEY`) with your Azure OpenAI resource details; keep the other values as provided unless you are using custom infrastructure. ## Running the Sample @@ -47,3 +47,17 @@ curl -X POST http://localhost:7071/api/agents/Joker/run \ ``` The agent responds with a JSON payload that includes the generated joke. + +## Expected Output + +When you send a POST request with plain-text input, the Functions host responds with an HTTP 202 and queues the request for the durable agent entity. A typical response body looks like the following: + +```json +{ + "status": "accepted", + "response": "Agent request accepted", + "message": "Tell me a short joke about cloud computing.", + "conversation_id": "", + "correlation_id": "" +} +``` diff --git a/python/samples/getting_started/azure_functions/01_single_agent/demo.http b/python/samples/getting_started/azure_functions/01_single_agent/demo.http index cc93ee79b5..1f7cbacba1 100644 --- a/python/samples/getting_started/azure_functions/01_single_agent/demo.http +++ b/python/samples/getting_started/azure_functions/01_single_agent/demo.http @@ -14,7 +14,7 @@ Content-Type: application/json { "message": "Add a security element to it.", "sessionId": "session-003", - "waitForCompletion": true + "waitForCompletion": false } ### Ask for a joke (plain text payload) diff --git a/python/samples/getting_started/azure_functions/01_single_agent/function_app.py b/python/samples/getting_started/azure_functions/01_single_agent/function_app.py index f317459321..1ba1997906 100644 --- a/python/samples/getting_started/azure_functions/01_single_agent/function_app.py +++ b/python/samples/getting_started/azure_functions/01_single_agent/function_app.py @@ -1,61 +1,38 @@ -"""Azure Functions single-agent sample showcasing how to host a single Azure OpenAI agent. +"""Host a single Azure OpenAI-powered agent inside Azure Functions. -The sample reads the required endpoint and deployment environment variables, configures the Azure OpenAI chat client (using either an API key or Azure CLI credentials), and registers a joke-telling agent with an Azure Functions app that can optionally expose a health check. +Components used in this sample: +- AzureOpenAIChatClient to call the Azure OpenAI chat deployment. +- AgentFunctionApp to expose HTTP endpoints via the Durable Functions extension. -Summary: Demonstrates configuring and deploying a single 'Joker' agent via Azure Functions.""" +Prerequisites: set `AZURE_OPENAI_ENDPOINT` and `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME` (plus `AZURE_OPENAI_API_KEY` or Azure CLI authentication) before starting the Functions host.""" -import logging -import os from typing import Any -from azure.identity import AzureCliCredential from agent_framework.azure import AzureOpenAIChatClient from agent_framework.azurefunctions import AgentFunctionApp - -logger = logging.getLogger(__name__) - - -AZURE_OPENAI_ENDPOINT_ENV = "AZURE_OPENAI_ENDPOINT" -AZURE_OPENAI_DEPLOYMENT_ENV = "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME" -AZURE_OPENAI_API_KEY_ENV = "AZURE_OPENAI_API_KEY" - - -def _build_client_kwargs() -> dict[str, Any]: - """Construct Azure OpenAI client options.""" - - endpoint = os.getenv(AZURE_OPENAI_ENDPOINT_ENV) - if not endpoint: - raise RuntimeError(f"{AZURE_OPENAI_ENDPOINT_ENV} environment variable is required.") - - deployment = os.getenv(AZURE_OPENAI_DEPLOYMENT_ENV) - if not deployment: - raise RuntimeError(f"{AZURE_OPENAI_DEPLOYMENT_ENV} environment variable is required.") - - logger.info("[SingleAgent] Using deployment '%s' at '%s'", deployment, endpoint) - - client_kwargs: dict[str, Any] = { - "endpoint": endpoint, - "deployment_name": deployment, - } - - api_key = os.getenv(AZURE_OPENAI_API_KEY_ENV) - if api_key: - client_kwargs["api_key"] = api_key - else: - client_kwargs["credential"] = AzureCliCredential() - - return client_kwargs - - +# 1. Instantiate the agent with the chosen deployment and instructions. def _create_agent() -> Any: """Create the Joker agent.""" - client_kwargs = _build_client_kwargs() - return AzureOpenAIChatClient(**client_kwargs).create_agent( + return AzureOpenAIChatClient().create_agent( name="Joker", instructions="You are good at telling jokes.", ) +# 2. Register the agent with AgentFunctionApp so Azure Functions exposes the required triggers. app = AgentFunctionApp(agents=[_create_agent()], enable_health_check=True) + +""" +Expected output when invoking `POST /api/agents/Joker/run` with plain-text input: + +HTTP/1.1 202 Accepted +{ + "status": "accepted", + "response": "Agent request accepted", + "message": "Tell me a short joke about cloud computing.", + "conversation_id": "", + "correlation_id": "" +} +""" diff --git a/python/samples/getting_started/azure_functions/01_single_agent/local.settings.json b/python/samples/getting_started/azure_functions/01_single_agent/local.settings.json.template similarity index 100% rename from python/samples/getting_started/azure_functions/01_single_agent/local.settings.json rename to python/samples/getting_started/azure_functions/01_single_agent/local.settings.json.template diff --git a/python/samples/getting_started/azure_functions/02_multi_agent/README.md b/python/samples/getting_started/azure_functions/02_multi_agent/README.md new file mode 100644 index 0000000000..14dfd5744c --- /dev/null +++ b/python/samples/getting_started/azure_functions/02_multi_agent/README.md @@ -0,0 +1,146 @@ +# Multi-Agent Sample + +This sample demonstrates how to use the Durable Extension for Agent Framework to create an Azure Functions app that hosts multiple AI agents and provides direct HTTP API access for interactive conversations with each agent. + +## Key Concepts Demonstrated + +- Using the Microsoft Agent Framework to define multiple AI agents with unique names and instructions. +- Registering multiple agents with the Function app and running them using HTTP. +- Conversation management (via session IDs) for isolated interactions per agent. +- Two different methods for registering agents: list-based initialization and incremental addition. + +## Environment Setup + +### 1. Create and activate a virtual environment + +**Windows (PowerShell):** +```powershell +python -m venv .venv +.venv\Scripts\Activate.ps1 +``` + +**Linux/macOS:** +```bash +python -m venv .venv +source .venv/bin/activate +``` + +### 2. Install dependencies + +See the [README.md](../README.md) file in the parent directory for more information on how to configure the environment, including how to install and run common sample dependencies. + +### 3. Configure local settings + +Copy `local.settings.json.template` to `local.settings.json`, then set the Azure OpenAI values (`AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, and optionally `AZURE_OPENAI_API_KEY`) to match your environment. + +## Running the Sample + +With the environment setup and function app running, you can test the sample by sending HTTP requests to the different agent endpoints. + +You can use the `demo.http` file to send messages to the agents, or a command line tool like `curl` as shown below: + +### Test the Weather Agent + +Bash (Linux/macOS/WSL): + +```bash +curl -X POST http://localhost:7071/api/agents/WeatherAgent/run \ + -H "Content-Type: application/json" \ + -d '{"message": "What is the weather in Seattle?"}' +``` + +PowerShell: + +```powershell +Invoke-RestMethod -Method Post ` + -Uri http://localhost:7071/api/agents/WeatherAgent/run ` + -ContentType application/json ` + -Body '{"message": "What is the weather in Seattle?"}' +``` + +Expected response: +```json +{ + "status": "accepted", + "response": "Agent request accepted", + "message": "What is the weather in Seattle?", + "conversation_id": "", + "correlation_id": "" +} +``` + +### Test the Math Agent + +Bash (Linux/macOS/WSL): + +```bash +curl -X POST http://localhost:7071/api/agents/MathAgent/run \ + -H "Content-Type: application/json" \ + -d '{"message": "Calculate a 20% tip on a $50 bill"}' +``` + +PowerShell: + +```powershell +Invoke-RestMethod -Method Post ` + -Uri http://localhost:7071/api/agents/MathAgent/run ` + -ContentType application/json ` + -Body '{"message": "Calculate a 20% tip on a $50 bill"}' +``` + +Expected response: +```json +{ + "status": "accepted", + "response": "Agent request accepted", + "message": "Calculate a 20% tip on a $50 bill", + "conversation_id": "", + "correlation_id": "" +} +``` + +### Check Health + +Bash (Linux/macOS/WSL): + +```bash +curl http://localhost:7071/api/health +``` + +PowerShell: + +```powershell +Invoke-RestMethod -Uri http://localhost:7071/api/health +``` + +Expected response: +```json +{ + "status": "healthy", + "agents": [ + {"name": "WeatherAgent", "type": "ChatAgent"}, + {"name": "MathAgent", "type": "ChatAgent"} + ], + "agent_count": 2 +} +``` + +## Code Structure + +The sample demonstrates two ways to register multiple agents: + +### Option 1: Pass list of agents during initialization +```python +app = AgentFunctionApp(agents=[weather_agent, math_agent]) +``` + +### Option 2: Add agents incrementally (commented in sample) +```python +app = AgentFunctionApp() +app.add_agent(weather_agent) +app.add_agent(math_agent) +``` + +Each agent automatically gets: +- `POST /api/agents/{agent_name}/run` - Send messages to the agent + diff --git a/python/samples/getting_started/azure_functions/02_multi_agent/demo.http b/python/samples/getting_started/azure_functions/02_multi_agent/demo.http new file mode 100644 index 0000000000..c685a01902 --- /dev/null +++ b/python/samples/getting_started/azure_functions/02_multi_agent/demo.http @@ -0,0 +1,58 @@ +### DAFx Multi-Agent Function App - HTTP Samples +### Use with the VS Code REST Client extension or any HTTP client +### +### API Structure: +### - POST /api/agents/{agentName}/run -> Send a message to an agent +### - GET /api/health -> Health check and agent metadata + +### Variables +@baseUrl = http://localhost:7071 +@weatherAgentName = WeatherAgent +@mathAgentName = MathAgent +@weatherAgentRoute = {{baseUrl}}/api/agents/{{weatherAgentName}} +@mathAgentRoute = {{baseUrl}}/api/agents/{{mathAgentName}} +@healthRoute = {{baseUrl}}/api/health + +### Health Check +# Confirms the Azure Functions app is running and both agents are registered +# Expected response: +# { +# "status": "healthy", +# "agents": [ +# {"name": "WeatherAgent", "type": "AzureOpenAIAssistantsAgent"}, +# {"name": "MathAgent", "type": "AzureOpenAIAssistantsAgent"} +# ], +# "agent_count": 2 +# } +GET {{healthRoute}} + +### + +### Weather Agent - Current Conditions +# Tests the Weather agent's tool-assisted response path +# Expected response: { "response": "The weather in Seattle...", "status": "success" } +POST {{weatherAgentRoute}}/run +Content-Type: application/json + +{ + "message": "What is the weather in Seattle?", + "sessionId": "weather-user-001", + "waitForCompletion": true +} + +### + + +### Math Agent - Tip Calculation +# Exercises the Math agent with a calculation request +# Expected response: { "response": "A 20% tip on a $50 bill is $10...", "status": "success" } +POST {{mathAgentRoute}}/run +Content-Type: application/json + +{ + "message": "Calculate a 20% tip on a $50 bill", + "sessionId": "math-user-001" +} + +### + diff --git a/python/samples/getting_started/azure_functions/02_multi_agent/function_app.py b/python/samples/getting_started/azure_functions/02_multi_agent/function_app.py new file mode 100644 index 0000000000..dfede85d0f --- /dev/null +++ b/python/samples/getting_started/azure_functions/02_multi_agent/function_app.py @@ -0,0 +1,99 @@ +"""Host multiple Azure OpenAI agents inside a single Azure Functions app. + +Components used in this sample: +- AzureOpenAIChatClient to create agents bound to a shared Azure OpenAI deployment. +- AgentFunctionApp to register multiple agents and expose dedicated HTTP endpoints. +- Custom tool functions to demonstrate tool invocation from different agents. + +Prerequisites: set `AZURE_OPENAI_ENDPOINT` and `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, plus either +`AZURE_OPENAI_API_KEY` or authenticate with Azure CLI before starting the Functions host.""" + +import logging +from typing import Any + +from agent_framework.azure import AzureOpenAIChatClient +from agent_framework.azurefunctions import AgentFunctionApp + + +logger = logging.getLogger(__name__) + + +def get_weather(location: str) -> dict[str, Any]: + """Get current weather for a location.""" + + logger.info(f"🔧 [TOOL CALLED] get_weather(location={location})") + result = { + "location": location, + "temperature": 72, + "conditions": "Sunny", + "humidity": 45, + } + logger.info(f"✓ [TOOL RESULT] {result}") + return result + + +def calculate_tip(bill_amount: float, tip_percentage: float = 15.0) -> dict[str, Any]: + """Calculate tip amount and total bill.""" + + logger.info( + f"🔧 [TOOL CALLED] calculate_tip(bill_amount={bill_amount}, tip_percentage={tip_percentage})" + ) + tip = bill_amount * (tip_percentage / 100) + total = bill_amount + tip + result = { + "bill_amount": bill_amount, + "tip_percentage": tip_percentage, + "tip_amount": round(tip, 2), + "total": round(total, 2), + } + logger.info(f"✓ [TOOL RESULT] {result}") + return result + + +# 1. Create multiple agents, each with its own instruction set and tools. +chat_client = AzureOpenAIChatClient() + +weather_agent = chat_client.create_agent( + name="WeatherAgent", + instructions="You are a helpful weather assistant. Provide current weather information.", + tools=[get_weather], +) + +math_agent = chat_client.create_agent( + name="MathAgent", + instructions="You are a helpful math assistant. Help users with calculations like tip calculations.", + tools=[calculate_tip], +) + + +# 2. Register both agents with AgentFunctionApp to expose their HTTP routes and health check. +app = AgentFunctionApp(agents=[weather_agent, math_agent], enable_health_check=True) + +# Option 2: Add agents after initialization (commented out as we're using Option 1) +# app = AgentFunctionApp(enable_health_check=True) +# app.add_agent(weather_agent) +# app.add_agent(math_agent) + +""" +Expected output when invoking `POST /api/agents/WeatherAgent/run`: + +HTTP/1.1 202 Accepted +{ + "status": "accepted", + "response": "Agent request accepted", + "message": "What is the weather in Seattle?", + "conversation_id": "", + "correlation_id": "" +} + +Expected output when invoking `POST /api/agents/MathAgent/run`: + +HTTP/1.1 202 Accepted +{ + "status": "accepted", + "response": "Agent request accepted", + "message": "Calculate a 20% tip on a $50 bill", + "conversation_id": "", + "correlation_id": "" +} +""" diff --git a/python/samples/getting_started/azure_functions/02_multi_agent/host.json b/python/samples/getting_started/azure_functions/02_multi_agent/host.json new file mode 100644 index 0000000000..d1a0a92006 --- /dev/null +++ b/python/samples/getting_started/azure_functions/02_multi_agent/host.json @@ -0,0 +1,15 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "maxTelemetryItemsPerSecond": 20 + } + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[4.*, 5.0.0)" + } +} diff --git a/python/samples/getting_started/azure_functions/02_multi_agent/local.settings.json.template b/python/samples/getting_started/azure_functions/02_multi_agent/local.settings.json.template new file mode 100644 index 0000000000..6c98a7d1cb --- /dev/null +++ b/python/samples/getting_started/azure_functions/02_multi_agent/local.settings.json.template @@ -0,0 +1,10 @@ +{ + "IsEncrypted": false, + "Values": { + "FUNCTIONS_WORKER_RUNTIME": "python", + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None", + "AZURE_OPENAI_ENDPOINT": "", + "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "" + } +} diff --git a/python/samples/getting_started/azure_functions/02_multi_agent/requirements.txt b/python/samples/getting_started/azure_functions/02_multi_agent/requirements.txt new file mode 100644 index 0000000000..8aa2c75d80 --- /dev/null +++ b/python/samples/getting_started/azure_functions/02_multi_agent/requirements.txt @@ -0,0 +1,2 @@ +agent-framework-azurefunctions +azure-identity \ No newline at end of file diff --git a/python/samples/getting_started/azure_functions/03_callbacks/README.md b/python/samples/getting_started/azure_functions/03_callbacks/README.md new file mode 100644 index 0000000000..979a31b2ec --- /dev/null +++ b/python/samples/getting_started/azure_functions/03_callbacks/README.md @@ -0,0 +1,109 @@ +# Callback Telemetry Sample + +This sample demonstrates how to use the Durable Extension for Agent Framework's response callbacks to observe +streaming updates and final agent responses in real time. The `ConversationAuditTrail` callback +records each chunk received from the Azure OpenAI agent and exposes the collected events through +an HTTP API that can be polled by a web client or dashboard. + +## Highlights + +- Registers a default `AgentResponseCallbackProtocol` implementation that logs streaming and final + responses. +- Persists callback events in an in-memory store and exposes them via + `GET /api/agents/{agentName}/callbacks/{conversationId}`. +- Shows how to reset stored callback events with `DELETE /api/agents/{agentName}/callbacks/{conversationId}`. +- Works alongside the standard `/api/agents/{agentName}/run` endpoint so you can correlate callback + telemetry with agent responses. + +## Prerequisites + +- Python 3.11+ +- Azure Functions Core Tools v4 +- Access to an Azure OpenAI deployment (configure the environment variables listed in + `local.settings.json` or export them in your shell) +- Dependencies from `requirements.txt` installed in your environment + +> **Note:** The sample stores callback events in memory for simplicity. For production scenarios you +> should persist events to Application Insights, Azure Storage, Cosmos DB, or another durable store. + +## Running the Sample + +1. Create and activate a virtual environment: + + **Windows (PowerShell):** + ```powershell + python -m venv .venv + .venv\Scripts\Activate.ps1 + ``` + + **Linux/macOS:** + ```bash + python -m venv .venv + source .venv/bin/activate + ``` + +2. Install dependencies (from the repository root or this directory): + + ```powershell + pip install -r requirements.txt + ``` + +3. Copy `local.settings.json.template` to `local.settings.json` and update the values (or export them as environment variables) with your Azure resources. + +4. Start the Functions host: + + ```powershell + func start + ``` + +5. Use the [`demo.http`](./demo.http) file (VS Code REST Client) or any HTTP client to: + - Send a message to the agent: `POST /api/agents/CallbackAgent/run` + - Query callback telemetry: `GET /api/agents/CallbackAgent/callbacks/{conversationId}` + - Clear stored events: `DELETE /api/agents/CallbackAgent/callbacks/{conversationId}` + +Example workflow after the host starts: + +```text +POST /api/agents/CallbackAgent/run # send a conversation message +GET /api/agents/CallbackAgent/callbacks/test-session # inspect streaming + final events +DELETE /api/agents/CallbackAgent/callbacks/test-session # reset telemetry for the session +``` + +The GET endpoint returns an array of events captured by the callback, including timestamps, +streaming chunk previews, and the final response metadata. This makes it easy to build real-time +UI updates or audit logs on top of Durable Agents. + +## Expected Output + +When you call `GET /api/agents/CallbackAgent/callbacks/{conversationId}` after sending a request to the agent, +the API returns a list of streaming and final callback events similar to the following: + +```json +[ + { + "timestamp": "2024-01-01T00:00:00Z", + "agent_name": "CallbackAgent", + "conversation_id": "", + "correlation_id": "", + "request_message": "Tell me a short joke", + "event_type": "stream", + "update_kind": "text", + "text": "Sure, here's a joke..." + }, + { + "timestamp": "2024-01-01T00:00:01Z", + "agent_name": "CallbackAgent", + "conversation_id": "", + "correlation_id": "", + "request_message": "Tell me a short joke", + "event_type": "final", + "response_text": "Why did the cloud...", + "usage": { + "type": "usage_details", + "input_token_count": 159, + "output_token_count": 29, + "total_token_count": 188 + } + } +] +``` diff --git a/python/samples/getting_started/azure_functions/03_callbacks/demo.http b/python/samples/getting_started/azure_functions/03_callbacks/demo.http new file mode 100644 index 0000000000..9f7e9efec6 --- /dev/null +++ b/python/samples/getting_started/azure_functions/03_callbacks/demo.http @@ -0,0 +1,30 @@ +### Callback Sample - API Tests +### Use with VS Code REST Client or another HTTP testing tool. +### +### Endpoints introduced in this sample: +### - POST /api/agents/{agentName}/run : send a message to the agent +### - GET /api/agents/{agentName}/callbacks/{conversationId} : retrieve callback telemetry +### - DELETE /api/agents/{agentName}/callbacks/{conversationId} : clear stored callback events + +@baseUrl = http://localhost:7071 +@agentName = CallbackAgent +@agentRoute = {{baseUrl}}/api/agents/{{agentName}} +@conversationId = test-stream-00 + +### Health Check +GET {{baseUrl}}/api/health + +### Send message (callbacks will capture streaming + final response) +POST {{agentRoute}}/run +Content-Type: application/json + +{ + "message": "Generate a short weather update for Paris and mention streaming callbacks.", + "sessionId": "{{conversationId}}" +} + +### Inspect callback telemetry +GET {{agentRoute}}/callbacks/{{conversationId}} + +### Clear stored callback telemetry for the conversation +DELETE {{agentRoute}}/callbacks/{{conversationId}} diff --git a/python/samples/getting_started/azure_functions/03_callbacks/function_app.py b/python/samples/getting_started/azure_functions/03_callbacks/function_app.py new file mode 100644 index 0000000000..ffbb2f387a --- /dev/null +++ b/python/samples/getting_started/azure_functions/03_callbacks/function_app.py @@ -0,0 +1,178 @@ +"""Capture agent response callbacks inside Azure Functions. + +Components used in this sample: +- AzureOpenAIChatClient to build an agent that streams interim updates. +- AgentFunctionApp with a default AgentResponseCallbackProtocol implementation. +- Azure Functions HTTP triggers that expose callback telemetry via REST. + +Prerequisites: set `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, and either +`AZURE_OPENAI_API_KEY` or authenticate with Azure CLI before starting the Functions host.""" + +import json +import logging +from collections import defaultdict +from datetime import datetime, timezone +from typing import Any, DefaultDict + +import azure.functions as func +from agent_framework import AgentRunResponseUpdate +from agent_framework.azure import AzureOpenAIChatClient + +from agent_framework.azurefunctions import AgentFunctionApp, AgentCallbackContext, AgentResponseCallbackProtocol + +logger = logging.getLogger(__name__) + + +# 1. Maintain an in-memory store for callback events (replace with durable storage in production). +CallbackStore = DefaultDict[str, list[dict[str, Any]]] +callback_events: CallbackStore = defaultdict(list) + + +def _serialize_usage(usage: Any) -> Any: + """Best-effort serialization for agent usage metadata.""" + + if usage is None: + return None + + model_dump = getattr(usage, "model_dump", None) + if callable(model_dump): + return model_dump() + + to_dict = getattr(usage, "to_dict", None) + if callable(to_dict): + return to_dict() + + return str(usage) + + +class ConversationAuditTrail(AgentResponseCallbackProtocol): + """Callback that records streaming chunks and final responses for later inspection.""" + + def __init__(self) -> None: + self._logger = logging.getLogger("durableagent.samples.callbacks.audit") + + async def on_streaming_response_update( + self, + update: AgentRunResponseUpdate, + context: AgentCallbackContext, + ) -> None: + event = self._build_base_event(context) + event.update( + { + "event_type": "stream", + "update_kind": getattr(update, "kind", "text"), + "text": getattr(update, "text", None), + } + ) + conversation_id = context.conversation_id or "" + callback_events[conversation_id].append(event) + + preview = event.get("text") or event.get("update_kind") + self._logger.info( + "[%s][%s] streaming chunk: %s", + context.agent_name, + context.correlation_id, + preview, + ) + + async def on_agent_response(self, response, context: AgentCallbackContext) -> None: + event = self._build_base_event(context) + event.update( + { + "event_type": "final", + "response_text": getattr(response, "text", None), + "usage": _serialize_usage(getattr(response, "usage_details", None)), + } + ) + conversation_id = context.conversation_id or "" + callback_events[conversation_id].append(event) + + self._logger.info( + "[%s][%s] final response recorded", + context.agent_name, + context.correlation_id, + ) + + @staticmethod + def _build_base_event(context: AgentCallbackContext) -> dict[str, Any]: + return { + "timestamp": datetime.now(timezone.utc).isoformat(), + "agent_name": context.agent_name, + "conversation_id": context.conversation_id, + "correlation_id": context.correlation_id, + "request_message": context.request_message, + } + + +# 2. Create the agent that will emit streaming updates and final responses. +callback_agent = AzureOpenAIChatClient().create_agent( + name="CallbackAgent", + instructions=( + "You are a friendly assistant that narrates actions while responding. " + "Keep answers concise and acknowledge when callbacks capture streaming updates." + ), +) + + +# 3. Register the agent inside AgentFunctionApp with a default callback instance. +audit_callback = ConversationAuditTrail() +app = AgentFunctionApp(enable_health_check=True, default_callback=audit_callback) +app.add_agent(callback_agent) + + +@app.function_name("get_callback_events") +@app.route(route="agents/{agent_name}/callbacks/{conversationId}", methods=["GET"]) +async def get_callback_events(req: func.HttpRequest) -> func.HttpResponse: + """Return all callback events collected for a conversation.""" + + conversation_id = req.route_params.get("conversationId", "") + events = callback_events.get(conversation_id, []) + return func.HttpResponse( + json.dumps(events, indent=2), + status_code=200, + mimetype="application/json", + ) + + +@app.function_name("reset_callback_events") +@app.route(route="agents/{agent_name}/callbacks/{conversationId}", methods=["DELETE"]) +async def reset_callback_events(req: func.HttpRequest) -> func.HttpResponse: + """Clear the stored callback events for a conversation.""" + + conversation_id = req.route_params.get("conversationId", "") + callback_events.pop(conversation_id, None) + return func.HttpResponse(status_code=204) + + +""" +Expected output when querying `GET /api/agents/CallbackAgent/callbacks/{conversationId}`: + +HTTP/1.1 200 OK +[ + { + "timestamp": "2024-01-01T00:00:00Z", + "agent_name": "CallbackAgent", + "conversation_id": "", + "correlation_id": "", + "request_message": "Tell me a short joke", + "event_type": "stream", + "update_kind": "text", + "text": "Sure, here's a joke..." + }, + { + "timestamp": "2024-01-01T00:00:01Z", + "agent_name": "CallbackAgent", + "conversation_id": "", + "correlation_id": "", + "request_message": "Tell me a short joke", + "event_type": "final", + "response_text": "Why did the cloud...", + "usage": { + "type": "usage_details", + "input_token_count": 159, + "output_token_count": 29, + "total_token_count": 188 + } + } +] +""" diff --git a/python/samples/getting_started/azure_functions/03_callbacks/host.json b/python/samples/getting_started/azure_functions/03_callbacks/host.json new file mode 100644 index 0000000000..d1a0a92006 --- /dev/null +++ b/python/samples/getting_started/azure_functions/03_callbacks/host.json @@ -0,0 +1,15 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "maxTelemetryItemsPerSecond": 20 + } + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[4.*, 5.0.0)" + } +} diff --git a/python/samples/getting_started/azure_functions/03_callbacks/local.settings.json.template b/python/samples/getting_started/azure_functions/03_callbacks/local.settings.json.template new file mode 100644 index 0000000000..6c98a7d1cb --- /dev/null +++ b/python/samples/getting_started/azure_functions/03_callbacks/local.settings.json.template @@ -0,0 +1,10 @@ +{ + "IsEncrypted": false, + "Values": { + "FUNCTIONS_WORKER_RUNTIME": "python", + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None", + "AZURE_OPENAI_ENDPOINT": "", + "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "" + } +} diff --git a/python/samples/getting_started/azure_functions/03_callbacks/requirements.txt b/python/samples/getting_started/azure_functions/03_callbacks/requirements.txt new file mode 100644 index 0000000000..8aa2c75d80 --- /dev/null +++ b/python/samples/getting_started/azure_functions/03_callbacks/requirements.txt @@ -0,0 +1,2 @@ +agent-framework-azurefunctions +azure-identity \ No newline at end of file diff --git a/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/README.md b/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/README.md new file mode 100644 index 0000000000..816cea8fcb --- /dev/null +++ b/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/README.md @@ -0,0 +1,56 @@ +# Single Agent Orchestration Sample (Python) + +This sample shows how to chain two invocations of the same agent inside a Durable Functions orchestration while +preserving the conversation state between runs. + +## Key Concepts +- Deterministic orchestrations that make sequential agent calls on a shared thread +- Reusing an agent thread to carry conversation history across invocations +- HTTP endpoints for starting the orchestration and polling for status/output + +## Prerequisites +- Python 3.11+ +- Azure Functions Core Tools v4 +- Local Azure Storage / Azurite and the Durable Task sidecar running +- Environment variables configured: + - `AZURE_OPENAI_ENDPOINT` + - `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME` + - `AZURE_OPENAI_API_KEY` (omit if using Azure CLI authentication) +- Copy `local.settings.json.template` to `local.settings.json` and populate those keys (and any storage settings) before running the Functions host. +- Dependencies installed: `pip install -r requirements.txt` + +## Running the Sample +1. Start the Functions host: `func start`. +2. Kick off the orchestration: + ```bash + curl -X POST http://localhost:7071/api/singleagent/run + ``` +3. Copy the `statusQueryGetUri` from the response and poll until the orchestration completes: + ```bash + curl http://localhost:7071/api/singleagent/status/ + ``` + +The orchestration first requests an inspirational sentence from the agent, then refines the initial response while +keeping it under 25 words—mirroring the behaviour of the corresponding .NET sample. + +## Expected Output + +Sample response when starting the orchestration: + +```json +{ + "message": "Single-agent orchestration started.", + "instanceId": "ebb5c1df123e4d6fb8e7d703ffd0d0b0", + "statusQueryGetUri": "http://localhost:7071/api/singleagent/status/ebb5c1df123e4d6fb8e7d703ffd0d0b0" +} +``` + +Sample completed status payload: + +```json +{ + "instanceId": "ebb5c1df123e4d6fb8e7d703ffd0d0b0", + "runtimeStatus": "Completed", + "output": "Learning is a journey where curiosity turns effort into mastery." +} +``` diff --git a/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/demo.http b/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/demo.http new file mode 100644 index 0000000000..74a45538c6 --- /dev/null +++ b/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/demo.http @@ -0,0 +1,9 @@ +### Start the single-agent orchestration +POST http://localhost:7071/api/singleagent/run + + +### Check the status of the orchestration + +@instanceId = + +GET http://localhost:7071/api/singleagent/status/{{instanceId}} \ No newline at end of file diff --git a/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/function_app.py b/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/function_app.py new file mode 100644 index 0000000000..e271082e54 --- /dev/null +++ b/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/function_app.py @@ -0,0 +1,174 @@ +"""Chain two runs of a single agent inside a Durable Functions orchestration. + +Components used in this sample: +- AzureOpenAIChatClient to construct the writer agent hosted by Agent Framework. +- AgentFunctionApp to surface HTTP and orchestration triggers via the Azure Functions extension. +- Durable Functions orchestration to run sequential agent invocations on the same conversation thread. + +Prerequisites: configure `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, and either +`AZURE_OPENAI_API_KEY` or authenticate with Azure CLI before starting the Functions host.""" + +import json +import logging +from typing import Any + +import azure.durable_functions as df +import azure.functions as func +from agent_framework.azure import AzureOpenAIChatClient +from azure.durable_functions import DurableOrchestrationContext +from agent_framework.azurefunctions import AgentFunctionApp, get_agent + +logger = logging.getLogger(__name__) + +# 1. Define the agent name used across the orchestration. +WRITER_AGENT_NAME = "WriterAgent" + + +# 2. Create the writer agent that will be invoked twice within the orchestration. +def _create_writer_agent() -> Any: + """Create the writer agent with the same persona as the C# sample.""" + + instructions = ( + "You refine short pieces of text. When given an initial sentence you enhance it;\n" + "when given an improved sentence you polish it further." + ) + + return AzureOpenAIChatClient().create_agent( + name=WRITER_AGENT_NAME, + instructions=instructions, + ) + + +# 3. Register the agent with AgentFunctionApp so HTTP and orchestration triggers are exposed. +app = AgentFunctionApp(agents=[_create_writer_agent()], enable_health_check=True) + + +# 4. Orchestration that runs the agent sequentially on a shared thread for chaining behaviour. +@app.orchestration_trigger(context_name="context") +def single_agent_orchestration(context: DurableOrchestrationContext): + """Run the writer agent twice on the same thread to mirror chaining behaviour.""" + + writer = get_agent(context, WRITER_AGENT_NAME) + writer_thread = writer.get_new_thread() + + initial = yield writer.run( + messages="Write a concise inspirational sentence about learning.", + thread=writer_thread, + ) + + improved_prompt = ( + "Improve this further while keeping it under 25 words: " + f"{initial.get('response', '').strip()}" + ) + + refined = yield writer.run( + messages=improved_prompt, + thread=writer_thread, + ) + + return refined.get("response", "") + + +# 5. HTTP endpoint to kick off the orchestration and return the status query URI. +@app.route(route="singleagent/run", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def start_single_agent_orchestration( + req: func.HttpRequest, + client: df.DurableOrchestrationClient, +) -> func.HttpResponse: + """Start the orchestration and return status metadata.""" + + instance_id = await client.start_new( + orchestration_function_name="single_agent_orchestration", + ) + + logger.info("[HTTP] Started orchestration with instance_id: %s", instance_id) + + status_url = _build_status_url(req.url, instance_id, route="singleagent") + + payload = { + "message": "Single-agent orchestration started.", + "instanceId": instance_id, + "statusQueryGetUri": status_url, + } + + return func.HttpResponse( + body=json.dumps(payload), + status_code=202, + mimetype="application/json", + ) + + +# 6. HTTP endpoint to fetch orchestration status using the original instance ID. +@app.route(route="singleagent/status/{instanceId}", methods=["GET"]) +@app.durable_client_input(client_name="client") +async def get_orchestration_status( + req: func.HttpRequest, + client: df.DurableOrchestrationClient, +) -> func.HttpResponse: + """Return orchestration runtime status.""" + + instance_id = req.route_params.get("instanceId") + if not instance_id: + return func.HttpResponse( + body=json.dumps({"error": "Missing instanceId"}), + status_code=400, + mimetype="application/json", + ) + + status = await client.get_status(instance_id) + if status is None: + return func.HttpResponse( + body=json.dumps({"error": "Instance not found"}), + status_code=404, + mimetype="application/json", + ) + + response_data: dict[str, Any] = { + "instanceId": status.instance_id, + "runtimeStatus": status.runtime_status.name if status.runtime_status else None, + } + + if status.input_ is not None: + response_data["input"] = status.input_ + + if status.output is not None: + response_data["output"] = status.output + + return func.HttpResponse( + body=json.dumps(response_data), + status_code=200, + mimetype="application/json", + ) + + +# 7. Helper to construct durable status URLs similar to the .NET sample implementation. +def _build_status_url(request_url: str, instance_id: str, *, route: str) -> str: + """Construct the status query URI similar to DurableHttpApiExtensions in C#.""" + + # Split once on /api/ to preserve host and scheme in local emulator and Azure. + base_url, _, _ = request_url.partition("/api/") + if not base_url: + base_url = request_url.rstrip("/") + return f"{base_url}/api/{route}/status/{instance_id}" + + +""" +Expected output when calling `POST /api/singleagent/run` and following the returned status URL: + +HTTP/1.1 202 Accepted +{ + "message": "Single-agent orchestration started.", + "instanceId": "", + "statusQueryGetUri": "http://localhost:7071/api/singleagent/status/" +} + +Subsequent `GET /api/singleagent/status/` after completion returns: + +HTTP/1.1 200 OK +{ + "instanceId": "", + "runtimeStatus": "Completed", + "output": "Learning is a journey where curiosity turns effort into mastery." +} +""" diff --git a/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/host.json b/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/host.json new file mode 100644 index 0000000000..4ef61f4578 --- /dev/null +++ b/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/host.json @@ -0,0 +1,23 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "maxTelemetryItemsPerSecond": 20 + } + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.*, 5.0.0)" + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + } + } + } +} diff --git a/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/local.settings.json.template b/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/local.settings.json.template new file mode 100644 index 0000000000..6c98a7d1cb --- /dev/null +++ b/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/local.settings.json.template @@ -0,0 +1,10 @@ +{ + "IsEncrypted": false, + "Values": { + "FUNCTIONS_WORKER_RUNTIME": "python", + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None", + "AZURE_OPENAI_ENDPOINT": "", + "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "" + } +} diff --git a/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/requirements.txt b/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/requirements.txt new file mode 100644 index 0000000000..39ad8a124f --- /dev/null +++ b/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/requirements.txt @@ -0,0 +1,2 @@ +agent-framework-azurefunctions +azure-identity diff --git a/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/README.md b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/README.md new file mode 100644 index 0000000000..5d7f421eff --- /dev/null +++ b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/README.md @@ -0,0 +1,61 @@ +# Multi-Agent Orchestration (Concurrency) – Python + +This sample starts a Durable Functions orchestration that runs two agents in parallel and merges their responses. + +## Highlights +- Two agents (`PhysicistAgent` and `ChemistAgent`) share a single Azure OpenAI deployment configuration. +- The orchestration uses `context.task_all(...)` to safely run both agents concurrently. +- HTTP routes (`/api/multiagent/run` and `/api/multiagent/status/{instanceId}`) mirror the .NET sample for parity. + +## Prerequisites +- Python 3.11+ +- Azure Functions Core Tools v4 +- Azurite / Azure Storage emulator and Durable Task sidecar running locally +- Environment variables configured: + - `AZURE_OPENAI_ENDPOINT` + - `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME` + - `AZURE_OPENAI_API_KEY` (omit when using Azure CLI auth) +- Copy `local.settings.json.template` to `local.settings.json` and fill in those Azure OpenAI values (and storage settings) before starting the Functions host. +- Install dependencies: `pip install -r requirements.txt` + +## Running the Sample +1. Start the Functions host: `func start`. +2. Send a prompt to start the orchestration: + ```bash + curl -X POST \ + -H "Content-Type: text/plain" \ + --data "What is temperature?" \ + http://localhost:7071/api/multiagent/run + ``` +3. Poll the returned `statusQueryGetUri` until the orchestration completes: + ```bash + curl http://localhost:7071/api/multiagent/status/ + ``` + +The orchestration launches both agents simultaneously so their domain-specific answers can be combined for the caller. + +## Expected Output + +Example response when starting the orchestration: + +```json +{ + "message": "Multi-agent concurrent orchestration started.", + "prompt": "What is temperature?", + "instanceId": "94d56266f0a04e5a8f9f3a1f77a4c597", + "statusQueryGetUri": "http://localhost:7071/api/multiagent/status/94d56266f0a04e5a8f9f3a1f77a4c597" +} +``` + +Example completed status payload: + +```json +{ + "instanceId": "94d56266f0a04e5a8f9f3a1f77a4c597", + "runtimeStatus": "Completed", + "output": { + "physicist": "Temperature measures the average kinetic energy of particles in a system.", + "chemist": "Temperature reflects how molecular motion influences reaction rates and equilibria." + } +} +``` diff --git a/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/demo.http b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/demo.http new file mode 100644 index 0000000000..28f3cdc283 --- /dev/null +++ b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/demo.http @@ -0,0 +1,11 @@ +### Start the multi-agent concurrent orchestration +POST http://localhost:7071/api/multiagent/run +Content-Type: text/plain + +What is temperature? + +### Check the status of the orchestration + +@instanceId = + +GET http://localhost:7071/api/multiagent/status/{{instanceId}} diff --git a/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py new file mode 100644 index 0000000000..fe3e558364 --- /dev/null +++ b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py @@ -0,0 +1,192 @@ +"""Fan out concurrent runs across two agents inside a Durable Functions orchestration. + +Components used in this sample: +- AzureOpenAIChatClient to create domain-specific agents hosted by Agent Framework. +- AgentFunctionApp to expose orchestration and HTTP triggers. +- Durable Functions orchestration that executes agent calls in parallel and aggregates results. + +Prerequisites: configure `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, and either +`AZURE_OPENAI_API_KEY` or authenticate with Azure CLI before starting the Functions host.""" + +import json +import logging +from typing import Any + +import azure.durable_functions as df +import azure.functions as func +from agent_framework.azure import AzureOpenAIChatClient +from azure.durable_functions import DurableOrchestrationContext +from agent_framework.azurefunctions import AgentFunctionApp, get_agent + +logger = logging.getLogger(__name__) + +# 1. Define agent names shared across the orchestration. +PHYSICIST_AGENT_NAME = "PhysicistAgent" +CHEMIST_AGENT_NAME = "ChemistAgent" + + +# 2. Instantiate both agents that the orchestration will run concurrently. +def _create_agents() -> list[Any]: + chat_client = AzureOpenAIChatClient() + + physicist = chat_client.create_agent( + name=PHYSICIST_AGENT_NAME, + instructions="You are an expert in physics. You answer questions from a physics perspective.", + ) + + chemist = chat_client.create_agent( + name=CHEMIST_AGENT_NAME, + instructions="You are an expert in chemistry. You answer questions from a chemistry perspective.", + ) + + return [physicist, chemist] + + +# 3. Register both agents with AgentFunctionApp and selectively enable HTTP endpoints. +agents = _create_agents() +app = AgentFunctionApp(enable_health_check=True, enable_http_endpoints=False) +app.add_agent(agents[0], enable_http_endpoint=True) +app.add_agent(agents[1]) + + +# 4. Durable Functions orchestration that runs both agents in parallel. +@app.orchestration_trigger(context_name="context") +def multi_agent_concurrent_orchestration(context: DurableOrchestrationContext): + """Fan out to two domain-specific agents and aggregate their responses.""" + + prompt = context.get_input() + if not prompt or not str(prompt).strip(): + raise ValueError("Prompt is required") + + physicist = get_agent(context, PHYSICIST_AGENT_NAME) + chemist = get_agent(context, CHEMIST_AGENT_NAME) + + physicist_thread = physicist.get_new_thread() + chemist_thread = chemist.get_new_thread() + + physicist_task = physicist.run(messages=str(prompt), thread=physicist_thread) + chemist_task = chemist.run(messages=str(prompt), thread=chemist_thread) + + results = yield context.task_all([physicist_task, chemist_task]) + + return { + "physicist": results[0].get("response", ""), + "chemist": results[1].get("response", ""), + } + + +# 5. HTTP endpoint to accept prompts and start the concurrent orchestration. +@app.route(route="multiagent/run", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def start_multi_agent_concurrent_orchestration( + req: func.HttpRequest, + client: df.DurableOrchestrationClient, +) -> func.HttpResponse: + """Kick off the orchestration with a plain text prompt.""" + + body_bytes = req.get_body() or b"" + prompt = body_bytes.decode("utf-8", errors="replace").strip() + if not prompt: + return func.HttpResponse( + body=json.dumps({"error": "Prompt is required"}), + status_code=400, + mimetype="application/json", + ) + + instance_id = await client.start_new( + orchestration_function_name="multi_agent_concurrent_orchestration", + client_input=prompt, + ) + + logger.info("[HTTP] Started orchestration with instance_id: %s", instance_id) + + status_url = _build_status_url(req.url, instance_id, route="multiagent") + + payload = { + "message": "Multi-agent concurrent orchestration started.", + "prompt": prompt, + "instanceId": instance_id, + "statusQueryGetUri": status_url, + } + + return func.HttpResponse( + body=json.dumps(payload), + status_code=202, + mimetype="application/json", + ) + + +# 6. HTTP endpoint to retrieve orchestration status and aggregated outputs. +@app.route(route="multiagent/status/{instanceId}", methods=["GET"]) +@app.durable_client_input(client_name="client") +async def get_orchestration_status( + req: func.HttpRequest, + client: df.DurableOrchestrationClient, +) -> func.HttpResponse: + instance_id = req.route_params.get("instanceId") + if not instance_id: + return func.HttpResponse( + body=json.dumps({"error": "Missing instanceId"}), + status_code=400, + mimetype="application/json", + ) + + status = await client.get_status(instance_id) + if status is None: + return func.HttpResponse( + body=json.dumps({"error": "Instance not found"}), + status_code=404, + mimetype="application/json", + ) + + response_data: dict[str, Any] = { + "instanceId": status.instance_id, + "runtimeStatus": status.runtime_status.name if status.runtime_status else None, + "createdTime": status.created_time.isoformat() if status.created_time else None, + "lastUpdatedTime": status.last_updated_time.isoformat() if status.last_updated_time else None, + } + + if status.input_ is not None: + response_data["input"] = status.input_ + + if status.output is not None: + response_data["output"] = status.output + + return func.HttpResponse( + body=json.dumps(response_data), + status_code=200, + mimetype="application/json", + ) + + +# 7. Helper to construct durable status URLs. +def _build_status_url(request_url: str, instance_id: str, *, route: str) -> str: + base_url, _, _ = request_url.partition("/api/") + if not base_url: + base_url = request_url.rstrip("/") + return f"{base_url}/api/{route}/status/{instance_id}" + + +""" +Expected output when calling `POST /api/multiagent/run` with a plain-text prompt: + +HTTP/1.1 202 Accepted +{ + "message": "Multi-agent concurrent orchestration started.", + "prompt": "What is temperature?", + "instanceId": "", + "statusQueryGetUri": "http://localhost:7071/api/multiagent/status/" +} + +Polling `GET /api/multiagent/status/` after completion returns: + +HTTP/1.1 200 OK +{ + "instanceId": "", + "runtimeStatus": "Completed", + "output": { + "physicist": "Temperature measures the average kinetic energy of particles in a system.", + "chemist": "Temperature reflects how molecular motion influences reaction rates and equilibria." + } +} +""" diff --git a/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/host.json b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/host.json new file mode 100644 index 0000000000..4ef61f4578 --- /dev/null +++ b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/host.json @@ -0,0 +1,23 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "maxTelemetryItemsPerSecond": 20 + } + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.*, 5.0.0)" + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + } + } + } +} diff --git a/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/local.settings.json.template b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/local.settings.json.template new file mode 100644 index 0000000000..6c98a7d1cb --- /dev/null +++ b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/local.settings.json.template @@ -0,0 +1,10 @@ +{ + "IsEncrypted": false, + "Values": { + "FUNCTIONS_WORKER_RUNTIME": "python", + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None", + "AZURE_OPENAI_ENDPOINT": "", + "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "" + } +} diff --git a/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/requirements.txt b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/requirements.txt new file mode 100644 index 0000000000..39ad8a124f --- /dev/null +++ b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/requirements.txt @@ -0,0 +1,2 @@ +agent-framework-azurefunctions +azure-identity diff --git a/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/README.md b/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/README.md new file mode 100644 index 0000000000..6d0c62e774 --- /dev/null +++ b/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/README.md @@ -0,0 +1,27 @@ +# Multi-Agent Orchestration (Conditionals) – Python + +This sample evaluates incoming emails with a spam detector agent and, +when appropriate, drafts a response using an email assistant agent. + +## Prerequisites +- Python 3.11 environment with dependencies from `requirements.txt` installed. +- Azure Functions Core Tools (`func`) available on the PATH. +- Environment variables `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, and either + `AZURE_OPENAI_API_KEY` or an active Azure CLI login. +- Copy `local.settings.json.template` to `local.settings.json` and populate those Azure OpenAI settings (and storage values) before starting the host. + +## Scenario Overview +- Two Azure OpenAI agents share a single deployment: one flags spam, the other drafts replies. +- Structured responses (`is_spam` and `reason`, or `response`) determine which orchestration branch runs. +- Activity functions handle the side effects of spam handling and email sending. + +## Run the Sample +1. Configure the environment variables and install dependencies with `pip install -r requirements.txt`. +2. Launch the Functions host from this directory using `func start`. +3. Send an email payload to `/api/spamdetection/run` (see `demo.http`). +4. Poll the provided `statusQueryGetUri` or call `/api/spamdetection/status/{instanceId}` to monitor results. + +## Expected Responses +- Spam payloads return `Email marked as spam: ` by invoking the `handle_spam_email` activity. +- Legitimate emails return `Email sent: ` after the email assistant agent produces a structured reply. +- The status endpoint mirrors Durable Functions metadata, including runtime status and the agent output. diff --git a/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/demo.http b/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/demo.http new file mode 100644 index 0000000000..44b49c5c46 --- /dev/null +++ b/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/demo.http @@ -0,0 +1,24 @@ +### Test spam detection with a legitimate email +POST http://localhost:7071/api/spamdetection/run +Content-Type: application/json + +{ + "email_id": "email-001", + "email_content": "Hi John, I hope you're doing well. I wanted to follow up on our meeting yesterday about the quarterly report. Could you please send me the updated figures by Friday? Thanks!" +} + + +### Test spam detection with a spam email +POST http://localhost:7071/api/spamdetection/run +Content-Type: application/json + +{ + "email_id": "email-002", + "email_content": "URGENT! You've won $1,000,000! Click here now to claim your prize! Limited time offer! Don't miss out!" +} + + +### Check the status of the orchestration +@instanceId = + +GET http://localhost:7071/api/spamdetection/status/{{instanceId}} diff --git a/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/function_app.py b/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/function_app.py new file mode 100644 index 0000000000..0e1c1ef677 --- /dev/null +++ b/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/function_app.py @@ -0,0 +1,273 @@ +"""Route email requests through conditional orchestration with two agents. + +Components used in this sample: +- AzureOpenAIChatClient agents for spam detection and email drafting. +- AgentFunctionApp with Durable orchestration, activity, and HTTP triggers. +- Pydantic models that validate payloads and agent JSON responses. + +Prerequisites: set `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, +and either `AZURE_OPENAI_API_KEY` or sign in with Azure CLI before running the +Functions host.""" + +import json +import logging +from typing import Any, cast +from collections.abc import Mapping + +import azure.durable_functions as df +import azure.functions as func +from agent_framework.azure import AzureOpenAIChatClient +from azure.durable_functions import DurableOrchestrationContext +from agent_framework.azurefunctions import AgentFunctionApp, get_agent +from pydantic import BaseModel, ValidationError + +logger = logging.getLogger(__name__) + +# 1. Define agent names shared across the orchestration. +SPAM_AGENT_NAME = "SpamDetectionAgent" +EMAIL_AGENT_NAME = "EmailAssistantAgent" + + +class SpamDetectionResult(BaseModel): + is_spam: bool + reason: str + + +class EmailResponse(BaseModel): + response: str + + +class EmailPayload(BaseModel): + email_id: str + email_content: str + +# 2. Instantiate both agents so they can be registered with AgentFunctionApp. +def _create_agents() -> list[Any]: + chat_client = AzureOpenAIChatClient() + + spam_agent = chat_client.create_agent( + name=SPAM_AGENT_NAME, + instructions="You are a spam detection assistant that identifies spam emails.", + ) + + email_agent = chat_client.create_agent( + name=EMAIL_AGENT_NAME, + instructions="You are an email assistant that helps users draft responses to emails with professionalism.", + ) + + return [spam_agent, email_agent] + + +app = AgentFunctionApp(agents=_create_agents(), enable_health_check=True) + + +# 3. Activities handle the side effects for spam and legitimate emails. +@app.activity_trigger(input_name="reason") +def handle_spam_email(reason: str) -> str: + return f"Email marked as spam: {reason}" + + +@app.activity_trigger(input_name="message") +def send_email(message: str) -> str: + return f"Email sent: {message}" + + +# 4. Orchestration validates input, runs agents, and branches on spam results. +@app.orchestration_trigger(context_name="context") +def spam_detection_orchestration(context: DurableOrchestrationContext): + payload_raw = context.get_input() + if not isinstance(payload_raw, Mapping): + raise ValueError("Email data is required") + + try: + payload = EmailPayload.model_validate(payload_raw) + except ValidationError as exc: + raise ValueError(f"Invalid email payload: {exc}") from exc + + spam_agent = get_agent(context, SPAM_AGENT_NAME) + email_agent = get_agent(context, EMAIL_AGENT_NAME) + + spam_thread = spam_agent.get_new_thread() + + spam_prompt = ( + "Analyze this email for spam content and return a JSON response with 'is_spam' (boolean) " + "and 'reason' (string) fields:\n" + f"Email ID: {payload.email_id}\n" + f"Content: {payload.email_content}" + ) + + spam_result_raw = yield spam_agent.run( + messages=spam_prompt, + thread=spam_thread, + response_format=SpamDetectionResult, + ) + + spam_result = cast(SpamDetectionResult, _coerce_structured(spam_result_raw, SpamDetectionResult)) + + if spam_result.is_spam: + result = yield context.call_activity("handle_spam_email", spam_result.reason) + return result + + email_thread = email_agent.get_new_thread() + + email_prompt = ( + "Draft a professional response to this email. Return a JSON response with a 'response' field " + "containing the reply:\n\n" + f"Email ID: {payload.email_id}\n" + f"Content: {payload.email_content}" + ) + + email_result_raw = yield email_agent.run( + messages=email_prompt, + thread=email_thread, + response_format=EmailResponse, + ) + + email_result = cast(EmailResponse, _coerce_structured(email_result_raw, EmailResponse)) + + result = yield context.call_activity("send_email", email_result.response) + return result + + +# 5. HTTP starter endpoint launches the orchestration for each email payload. +@app.route(route="spamdetection/run", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def start_spam_detection_orchestration( + req: func.HttpRequest, + client: df.DurableOrchestrationClient, +) -> func.HttpResponse: + try: + body = req.get_json() + except ValueError: + body = None + + if not isinstance(body, Mapping): + return func.HttpResponse( + body=json.dumps({"error": "Email data is required"}), + status_code=400, + mimetype="application/json", + ) + + try: + payload = EmailPayload.model_validate(body) + except ValidationError as exc: + return func.HttpResponse( + body=json.dumps({"error": f"Invalid email payload: {exc}"}), + status_code=400, + mimetype="application/json", + ) + + instance_id = await client.start_new( + orchestration_function_name="spam_detection_orchestration", + client_input=payload.model_dump(), + ) + + logger.info("[HTTP] Started spam detection orchestration with instance_id: %s", instance_id) + + status_url = _build_status_url(req.url, instance_id, route="spamdetection") + + payload_json = { + "message": "Spam detection orchestration started.", + "emailId": payload.email_id, + "instanceId": instance_id, + "statusQueryGetUri": status_url, + } + + return func.HttpResponse( + body=json.dumps(payload_json), + status_code=202, + mimetype="application/json", + ) + + +# 6. Status endpoint mirrors Durable Functions default payload with agent data. +@app.route(route="spamdetection/status/{instanceId}", methods=["GET"]) +@app.durable_client_input(client_name="client") +async def get_orchestration_status( + req: func.HttpRequest, + client: df.DurableOrchestrationClient, +) -> func.HttpResponse: + instance_id = req.route_params.get("instanceId") + if not instance_id: + return func.HttpResponse( + body=json.dumps({"error": "Missing instanceId"}), + status_code=400, + mimetype="application/json", + ) + + status = await client.get_status(instance_id) + if status is None: + return func.HttpResponse( + body=json.dumps({"error": "Instance not found"}), + status_code=404, + mimetype="application/json", + ) + + response_data: dict[str, Any] = { + "instanceId": status.instance_id, + "runtimeStatus": status.runtime_status.name if status.runtime_status else None, + "createdTime": status.created_time.isoformat() if status.created_time else None, + "lastUpdatedTime": status.last_updated_time.isoformat() if status.last_updated_time else None, + } + + if status.input_ is not None: + response_data["input"] = status.input_ + + if status.output is not None: + response_data["output"] = status.output + + return func.HttpResponse( + body=json.dumps(response_data), + status_code=200, + mimetype="application/json", + ) + + +# 7. Helper utilities keep URL construction and structured parsing deterministic. +def _build_status_url(request_url: str, instance_id: str, *, route: str) -> str: + base_url, _, _ = request_url.partition("/api/") + if not base_url: + base_url = request_url.rstrip("/") + return f"{base_url}/api/{route}/status/{instance_id}" + + +def _coerce_structured(result: Mapping[str, Any], model: type[BaseModel]) -> BaseModel: + structured = result.get("structured_response") if isinstance(result, Mapping) else None + if structured is not None: + return model.model_validate(structured) + + response_text = result.get("response") if isinstance(result, Mapping) else None + if isinstance(response_text, str) and response_text.strip(): + try: + parsed = json.loads(response_text) + if isinstance(parsed, Mapping): + return model.model_validate(parsed) + except json.JSONDecodeError: + logger.warning("[ConditionalOrchestration] Failed to parse agent JSON response; raising error.") + + # If parsing failed, raise to surface the issue to the caller. + raise ValueError(f"Agent response could not be parsed as {model.__name__}.") + + +""" +Expected response from `POST /api/spamdetection/run`: + +HTTP/1.1 202 Accepted +{ + "message": "Spam detection orchestration started.", + "emailId": "123", + "instanceId": "", + "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/" +} + +Expected response from `GET /api/spamdetection/status/{instanceId}` once complete: + +HTTP/1.1 200 OK +{ + "instanceId": "", + "runtimeStatus": "Completed", + "createdTime": "2024-01-01T00:00:00+00:00", + "lastUpdatedTime": "2024-01-01T00:00:10+00:00", + "output": "Email sent: Thank you for reaching out..." +} +""" diff --git a/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/host.json b/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/host.json new file mode 100644 index 0000000000..4ef61f4578 --- /dev/null +++ b/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/host.json @@ -0,0 +1,23 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "maxTelemetryItemsPerSecond": 20 + } + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.*, 5.0.0)" + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + } + } + } +} diff --git a/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/local.settings.json.template b/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/local.settings.json.template new file mode 100644 index 0000000000..6c98a7d1cb --- /dev/null +++ b/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/local.settings.json.template @@ -0,0 +1,10 @@ +{ + "IsEncrypted": false, + "Values": { + "FUNCTIONS_WORKER_RUNTIME": "python", + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None", + "AZURE_OPENAI_ENDPOINT": "", + "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "" + } +} diff --git a/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/requirements.txt b/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/requirements.txt new file mode 100644 index 0000000000..39ad8a124f --- /dev/null +++ b/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/requirements.txt @@ -0,0 +1,2 @@ +agent-framework-azurefunctions +azure-identity diff --git a/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/README.md b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/README.md new file mode 100644 index 0000000000..9055c62486 --- /dev/null +++ b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/README.md @@ -0,0 +1,33 @@ +# Single-Agent Orchestration (HITL) – Python + +This sample demonstrates the human-in-the-loop (HITL) scenario. +A single writer agent iterates on content until a human reviewer approves the +output or a maximum number of attempts is reached. + +## Prerequisites +- Python 3.11 environment with the packages from `requirements.txt` installed. +- Azure Functions Core Tools (`func`) available on the PATH. +- Environment variables `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, and either + `AZURE_OPENAI_API_KEY` or an active Azure CLI session. +- Copy `local.settings.json.template` to `local.settings.json` and configure those keys (plus storage settings) before starting the Functions host. + +## What It Shows +- Identical environment variable usage (`AZURE_OPENAI_ENDPOINT`, + `AZURE_OPENAI_DEPLOYMENT`) and HTTP surface area (`/api/hitl/...`). +- Durable orchestrations that pause for external events while maintaining + deterministic state (`context.wait_for_external_event` + timed cancellation). +- Activity functions that encapsulate the out-of-band operations such as notifying +a reviewer and publishing content. + +## Run the Sample +1. Configure the environment variables and install dependencies with `pip install -r requirements.txt`. +2. Start the Functions host in this directory using `func start`. +3. Trigger the orchestration with `demo.http` (or another HTTP client) by POSTing to `/api/hitl/run`. +4. Poll the `statusQueryGetUri` or call `/api/hitl/status/{instanceId}` to monitor progress. +5. POST an approval or rejection payload to `/api/hitl/approve/{instanceId}` to complete the review loop. + +## Expected Responses +- `POST /api/hitl/run` returns a 202 Accepted payload with the Durable Functions instance ID. +- `POST /api/hitl/approve/{instanceId}` echoes the decision that the orchestration receives. +- `GET /api/hitl/status/{instanceId}` reports `runtimeStatus`, custom status messages, and the final content when approved. +The orchestration sets custom status messages, retries on rejection with reviewer feedback, and raises a timeout if human approval does not arrive. diff --git a/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/demo.http b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/demo.http new file mode 100644 index 0000000000..42f93b8543 --- /dev/null +++ b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/demo.http @@ -0,0 +1,45 @@ +### Start the HITL content generation orchestration with default timeout (72 hours) +POST http://localhost:7071/api/hitl/run +Content-Type: application/json + +{ + "topic": "The Future of Artificial Intelligence", + "max_review_attempts": 3 +} + + +### Start the HITL content generation orchestration with a short timeout (~4 seconds) +POST http://localhost:7071/api/hitl/run +Content-Type: application/json + +{ + "topic": "The Future of Artificial Intelligence", + "max_review_attempts": 3, + "approval_timeout_hours": 0.001 +} + + +### Replace INSTANCE_ID_GOES_HERE below with the value returned from the POST call +@instanceId= + +### Check the status of the orchestration +GET http://localhost:7071/api/hitl/status/{{instanceId}} + +### Send human approval +POST http://localhost:7071/api/hitl/approve/{{instanceId}} +Content-Type: application/json + +{ + "approved": true, + "feedback": "Great article! The content is well-structured and informative." +} + +### Send human rejection with feedback +POST http://localhost:7071/api/hitl/approve/{{instanceId}} +Content-Type: application/json + +{ + "approved": false, + "feedback": "The article needs more technical depth and better examples." +} + diff --git a/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py new file mode 100644 index 0000000000..df70dfb9b9 --- /dev/null +++ b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py @@ -0,0 +1,399 @@ +"""Iterate on generated content with a human-in-the-loop Durable orchestration. + +Components used in this sample: +- AzureOpenAIChatClient for a single writer agent that emits structured JSON. +- AgentFunctionApp with Durable orchestration, HTTP triggers, and activity triggers. +- External events that pause the workflow until a human decision arrives or times out. + +Prerequisites: configure `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, and +either `AZURE_OPENAI_API_KEY` or sign in with Azure CLI before running `func start`.""" + +import json +import logging +from datetime import timedelta +from typing import Any +from collections.abc import Mapping + +import azure.durable_functions as df +import azure.functions as func +from agent_framework.azure import AzureOpenAIChatClient +from azure.durable_functions import DurableOrchestrationContext +from agent_framework.azurefunctions import AgentFunctionApp, get_agent +from pydantic import BaseModel, ValidationError + +logger = logging.getLogger(__name__) + +# 1. Define orchestration constants used throughout the workflow. +WRITER_AGENT_NAME = "WriterAgent" +HUMAN_APPROVAL_EVENT = "HumanApproval" + + +class ContentGenerationInput(BaseModel): + topic: str + max_review_attempts: int = 3 + approval_timeout_hours: float = 72 + + +class GeneratedContent(BaseModel): + title: str + content: str + + +class HumanApproval(BaseModel): + approved: bool + feedback: str = "" + + +# 2. Create the writer agent that produces structured JSON responses. +def _create_writer_agent() -> Any: + instructions = ( + "You are a professional content writer who creates high-quality articles on various topics. " + "You write engaging, informative, and well-structured content that follows best practices for readability and accuracy. " + "Return your response as JSON with 'title' and 'content' fields." + ) + + return AzureOpenAIChatClient().create_agent( + name=WRITER_AGENT_NAME, + instructions=instructions, + ) + + +app = AgentFunctionApp(agents=[_create_writer_agent()], enable_health_check=True) + + +# 3. Activities encapsulate external work for review notifications and publishing. +@app.activity_trigger(input_name="content") +def notify_user_for_approval(content: dict[str, Any]) -> None: + model = GeneratedContent.model_validate(content) + logger.info("NOTIFICATION: Please review the following content for approval:") + logger.info("Title: %s", model.title or "(untitled)") + logger.info("Content: %s", model.content) + logger.info("Use the approval endpoint to approve or reject this content.") + + +@app.activity_trigger(input_name="content") +def publish_content(content: dict[str, Any]) -> None: + model = GeneratedContent.model_validate(content) + logger.info("PUBLISHING: Content has been published successfully:") + logger.info("Title: %s", model.title or "(untitled)") + logger.info("Content: %s", model.content) + + +# 4. Orchestration loops until the human approves, times out, or attempts are exhausted. +@app.orchestration_trigger(context_name="context") +def content_generation_hitl_orchestration(context: DurableOrchestrationContext): + payload_raw = context.get_input() + if not isinstance(payload_raw, Mapping): + raise ValueError("Content generation input is required") + + try: + payload = ContentGenerationInput.model_validate(payload_raw) + except ValidationError as exc: + raise ValueError(f"Invalid content generation input: {exc}") from exc + + writer = get_agent(context, WRITER_AGENT_NAME) + writer_thread = writer.get_new_thread() + + context.set_custom_status(f"Starting content generation for topic: {payload.topic}") + + initial_raw = yield writer.run( + messages=f"Write a short article about '{payload.topic}'.", + thread=writer_thread, + response_format=GeneratedContent, + ) + content = _coerce_generated_content(initial_raw) + + attempt = 0 + while attempt < payload.max_review_attempts: + attempt += 1 + context.set_custom_status( + f"Requesting human feedback. Iteration #{attempt}. Timeout: {payload.approval_timeout_hours} hour(s)." + ) + + yield context.call_activity("notify_user_for_approval", content.model_dump()) + + approval_task = context.wait_for_external_event(HUMAN_APPROVAL_EVENT) + timeout_task = context.create_timer( + context.current_utc_datetime + timedelta(hours=payload.approval_timeout_hours) + ) + + winner = yield context.task_any([approval_task, timeout_task]) + + if winner == approval_task: + timeout_task.cancel() # type: ignore[attr-defined] + approval_payload = _parse_human_approval(approval_task.result) + + if approval_payload.approved: + context.set_custom_status("Content approved by human reviewer. Publishing content...") + yield context.call_activity("publish_content", content.model_dump()) + context.set_custom_status( + f"Content published successfully at {context.current_utc_datetime:%Y-%m-%dT%H:%M:%S}" + ) + return {"content": content.content} + + context.set_custom_status( + "Content rejected by human reviewer. Incorporating feedback and regenerating..." + ) + rewrite_prompt = ( + "The content was rejected by a human reviewer. Please rewrite the article incorporating their feedback.\n\n" + f"Human Feedback: {approval_payload.feedback or 'No feedback provided.'}" + ) + rewritten_raw = yield writer.run( + messages=rewrite_prompt, + thread=writer_thread, + response_format=GeneratedContent, + ) + content = _coerce_generated_content(rewritten_raw) + else: + context.set_custom_status( + f"Human approval timed out after {payload.approval_timeout_hours} hour(s). Treating as rejection." + ) + raise TimeoutError( + f"Human approval timed out after {payload.approval_timeout_hours} hour(s)." + ) + + raise RuntimeError(f"Content could not be approved after {payload.max_review_attempts} iteration(s).") + + +# 5. HTTP endpoint that starts the human-in-the-loop orchestration. +@app.route(route="hitl/run", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def start_content_generation( + req: func.HttpRequest, + client: df.DurableOrchestrationClient, +) -> func.HttpResponse: + try: + body = req.get_json() + except ValueError: + body = None + + if not isinstance(body, Mapping): + return func.HttpResponse( + body=json.dumps({"error": "Request body must be valid JSON."}), + status_code=400, + mimetype="application/json", + ) + + try: + payload = ContentGenerationInput.model_validate(body) + except ValidationError as exc: + return func.HttpResponse( + body=json.dumps({"error": f"Invalid content generation input: {exc}"}), + status_code=400, + mimetype="application/json", + ) + + instance_id = await client.start_new( + orchestration_function_name="content_generation_hitl_orchestration", + client_input=payload.model_dump(), + ) + + status_url = _build_status_url(req.url, instance_id, route="hitl") + + payload_json = { + "message": "HITL content generation orchestration started.", + "topic": payload.topic, + "instanceId": instance_id, + "statusQueryGetUri": status_url, + } + + return func.HttpResponse( + body=json.dumps(payload_json), + status_code=202, + mimetype="application/json", + ) + + +# 6. Endpoint that delivers human approval or rejection back into the orchestration. +@app.route(route="hitl/approve/{instanceId}", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def send_human_approval( + req: func.HttpRequest, + client: df.DurableOrchestrationClient, +) -> func.HttpResponse: + instance_id = req.route_params.get("instanceId") + if not instance_id: + return func.HttpResponse( + body=json.dumps({"error": "Missing instanceId in route."}), + status_code=400, + mimetype="application/json", + ) + + try: + body = req.get_json() + except ValueError: + body = None + + if not isinstance(body, Mapping): + return func.HttpResponse( + body=json.dumps({"error": "Approval response is required"}), + status_code=400, + mimetype="application/json", + ) + + try: + approval = HumanApproval.model_validate(body) + except ValidationError as exc: + return func.HttpResponse( + body=json.dumps({"error": f"Invalid approval payload: {exc}"}), + status_code=400, + mimetype="application/json", + ) + + await client.raise_event(instance_id, HUMAN_APPROVAL_EVENT, approval.model_dump()) + + payload_json = { + "message": "Human approval sent to orchestration.", + "instanceId": instance_id, + "approved": approval.approved, + } + + return func.HttpResponse( + body=json.dumps(payload_json), + status_code=200, + mimetype="application/json", + ) + + +# 7. Endpoint that mirrors Durable Functions status plus custom workflow messaging. +@app.route(route="hitl/status/{instanceId}", methods=["GET"]) +@app.durable_client_input(client_name="client") +async def get_orchestration_status( + req: func.HttpRequest, + client: df.DurableOrchestrationClient, +) -> func.HttpResponse: + instance_id = req.route_params.get("instanceId") + if not instance_id: + return func.HttpResponse( + body=json.dumps({"error": "Missing instanceId"}), + status_code=400, + mimetype="application/json", + ) + + status = await client.get_status( + instance_id, + show_history=False, + show_history_output=False, + show_input=True, + ) + + # Check if status is None or if the instance doesn't exist (runtime_status is None) + if status is None or getattr(status, "runtime_status", None) is None: + return func.HttpResponse( + body=json.dumps({"error": "Instance not found."}), + status_code=404, + mimetype="application/json", + ) + + response_data: dict[str, Any] = { + "instanceId": getattr(status, "instance_id", None), + "runtimeStatus": getattr(status.runtime_status, "name", None) + if getattr(status, "runtime_status", None) + else None, + "workflowStatus": getattr(status, "custom_status", None), + } + + if getattr(status, "input_", None) is not None: + response_data["input"] = status.input_ + + if getattr(status, "output", None) is not None: + response_data["output"] = status.output + + failure_details = getattr(status, "failure_details", None) + if failure_details is not None: + response_data["failureDetails"] = failure_details + + return func.HttpResponse( + body=json.dumps(response_data), + status_code=200, + mimetype="application/json", + ) + + +# 8. Helper utilities keep parsing logic deterministic. +def _build_status_url(request_url: str, instance_id: str, *, route: str) -> str: + base_url, _, _ = request_url.partition("/api/") + if not base_url: + base_url = request_url.rstrip("/") + return f"{base_url}/api/{route}/status/{instance_id}" + + +def _coerce_generated_content(result: Mapping[str, Any]) -> GeneratedContent: + structured = result.get("structured_response") if isinstance(result, Mapping) else None + if structured is not None: + return GeneratedContent.model_validate(structured) + + response_text = result.get("response") if isinstance(result, Mapping) else None + if isinstance(response_text, str) and response_text.strip(): + try: + parsed = json.loads(response_text) + if isinstance(parsed, Mapping): + return GeneratedContent.model_validate(parsed) + except json.JSONDecodeError: + logger.warning("[HITL] Failed to parse agent JSON response; falling back to defaults.") + + raise ValueError("Agent response could not be parsed as GeneratedContent.") + + +def _parse_human_approval(raw: Any) -> HumanApproval: + if isinstance(raw, Mapping): + return HumanApproval.model_validate(raw) + + if isinstance(raw, str): + stripped = raw.strip() + if not stripped: + return HumanApproval(approved=False, feedback="") + try: + parsed = json.loads(stripped) + if isinstance(parsed, Mapping): + return HumanApproval.model_validate(parsed) + except json.JSONDecodeError: + logger.debug( + "[HITL] Approval payload is not valid JSON; using string heuristics.", + exc_info=True, + ) + + affirmative = {"true", "yes", "approved", "y", "1"} + negative = {"false", "no", "rejected", "n", "0"} + lower = stripped.lower() + if lower in affirmative: + return HumanApproval(approved=True, feedback="") + if lower in negative: + return HumanApproval(approved=False, feedback="") + return HumanApproval(approved=False, feedback=stripped) + + raise ValueError("Approval payload must be a JSON object or string.") + + +""" +Expected response from `POST /api/hitl/run`: + +HTTP/1.1 202 Accepted +{ + "message": "HITL content generation orchestration started.", + "topic": "Contoso launch", + "instanceId": "", + "statusQueryGetUri": "http://localhost:7071/api/hitl/status/" +} + +Expected response after approving via `POST /api/hitl/approve/{instanceId}`: + +HTTP/1.1 200 OK +{ + "message": "Human approval sent to orchestration.", + "instanceId": "", + "approved": true +} + +Expected response from `GET /api/hitl/status/{instanceId}` once published: + +HTTP/1.1 200 OK +{ + "instanceId": "", + "runtimeStatus": "Completed", + "workflowStatus": "Content published successfully at 2024-01-01T12:00:00", + "output": { + "content": "Thank you for joining the Contoso product launch..." + } +} +""" diff --git a/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/host.json b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/host.json new file mode 100644 index 0000000000..4ef61f4578 --- /dev/null +++ b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/host.json @@ -0,0 +1,23 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "maxTelemetryItemsPerSecond": 20 + } + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.*, 5.0.0)" + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + } + } + } +} diff --git a/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/local.settings.json.template b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/local.settings.json.template new file mode 100644 index 0000000000..6c98a7d1cb --- /dev/null +++ b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/local.settings.json.template @@ -0,0 +1,10 @@ +{ + "IsEncrypted": false, + "Values": { + "FUNCTIONS_WORKER_RUNTIME": "python", + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None", + "AZURE_OPENAI_ENDPOINT": "", + "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "" + } +} diff --git a/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/requirements.txt b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/requirements.txt new file mode 100644 index 0000000000..39ad8a124f --- /dev/null +++ b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/requirements.txt @@ -0,0 +1,2 @@ +agent-framework-azurefunctions +azure-identity