Python: Add Integration tests for AzureFunctions (#2020)

* Add Integration tests

* Remove DTS extension

* Apply suggestions from code review

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Add pyi file for type safety

* Add samples in readme

* Updated all readme instructions

* Address comments

* Update readmes

* Fix requirements

* Address comments

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Laveesh Rohra
2025-11-10 21:27:17 -08:00
committed by GitHub
Unverified
parent 916b51fe1a
commit 4eb31f120b
50 changed files with 1401 additions and 106 deletions
@@ -0,0 +1,12 @@
# Azure OpenAI Configuration
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
AZURE_OPENAI_CHAT_DEPLOYMENT_NAME=your-deployment-name
AZURE_OPENAI_API_KEY=your-api-key-here
FUNCTIONS_WORKER_RUNTIME=python
RUN_INTEGRATION_TESTS=true
# Azure Functions Configuration
AzureWebJobsStorage=UseDevelopmentStorage=true
DURABLE_TASK_SCHEDULER_CONNECTION_STRING=Endpoint=http://localhost:8080;Authentication=None
# Note: TASKHUB_NAME is not required for integration tests; it is auto-generated per test run.
@@ -0,0 +1,81 @@
# Sample Integration Tests
Integration tests that validate the Durable Agent Framework samples by running them as Azure Functions.
## Setup
### 1. Create `.env` file
Copy `.env.example` to `.env` and fill in your Azure credentials:
```bash
cp .env.example .env
```
Required variables:
- `AZURE_OPENAI_ENDPOINT`
- `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`
- `AZURE_OPENAI_API_KEY`
- `AzureWebJobsStorage`
- `DURABLE_TASK_SCHEDULER_CONNECTION_STRING`
- `FUNCTIONS_WORKER_RUNTIME`
### 2. Start required services
**Azurite (for orchestration tests):**
```bash
docker run -d -p 10000:10000 -p 10001:10001 -p 10002:10002 mcr.microsoft.com/azure-storage/azurite
```
**Durable Task Scheduler:**
```bash
docker run -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
```
## Running Tests
The tests automatically start and stop the Azure Functions app for each sample.
### Run all sample tests
```bash
uv run pytest packages/azurefunctions/tests/integration_tests -v
```
### Run specific sample
```bash
uv run pytest packages/azurefunctions/tests/integration_tests/test_01_single_agent.py -v
```
### Run with verbose output
```bash
uv run pytest packages/azurefunctions/tests/integration_tests -sv
```
## How It Works
Each test file uses pytest markers to automatically configure and start the function app:
```python
pytestmark = [
pytest.mark.sample("01_single_agent"),
pytest.mark.usefixtures("function_app_for_test"),
skip_if_azure_functions_integration_tests_disabled,
]
```
The `function_app_for_test` fixture:
1. Loads environment variables from `.env`
2. Validates required variables are present
3. Starts the function app on a dynamically allocated port
4. Waits for the app to be ready
5. Runs your tests
6. Tears down the function app
## Troubleshooting
**Missing environment variables:**
Ensure your `.env` file contains all required variables from `.env.example`.
**Tests timeout:**
Check that Azure OpenAI credentials are valid and the service is accessible.
@@ -0,0 +1 @@
# Copyright (c) Microsoft. All rights reserved.
@@ -0,0 +1,121 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Pytest configuration for Durable Agent Framework tests.
This module provides fixtures and configuration for pytest.
"""
import subprocess
from collections.abc import Iterator, Mapping
from typing import Any
import pytest
import requests
from .testutils import (
FunctionAppStartupError,
build_base_url,
cleanup_function_app,
find_available_port,
get_sample_path_from_marker,
load_and_validate_env,
start_function_app,
wait_for_function_app_ready,
)
def pytest_configure(config: pytest.Config) -> None:
"""Register custom markers."""
config.addinivalue_line("markers", "orchestration: marks tests that use orchestrations (require Azurite)")
config.addinivalue_line(
"markers",
"sample(path): specify the sample directory path for the test (e.g., @pytest.mark.sample('01_single_agent'))",
)
@pytest.fixture(scope="session")
def function_app_running() -> bool:
"""
Check if the function app is running on localhost:7071.
This fixture can be used to skip tests if the function app is not available.
"""
try:
response = requests.get("http://localhost:7071/api/health", timeout=2)
return response.status_code == 200
except requests.exceptions.RequestException:
return False
@pytest.fixture(scope="session")
def skip_if_no_function_app(function_app_running: bool) -> None:
"""Skip test if function app is not running."""
if not function_app_running:
pytest.skip("Function app is not running on http://localhost:7071")
@pytest.fixture(scope="module")
def function_app_for_test(request: pytest.FixtureRequest) -> Iterator[dict[str, int | str]]:
"""
Start the function app for the corresponding sample based on marker.
This fixture:
1. Determines which sample to run from @pytest.mark.sample()
2. Validates environment variables
3. Starts the function app using 'func start'
4. Waits for the app to be ready
5. Tears down the app after tests complete
Usage:
@pytest.mark.sample("01_single_agent")
@pytest.mark.usefixtures("function_app_for_test")
class TestSample01SingleAgent:
...
"""
# Get sample path from marker
sample_path, error_message = get_sample_path_from_marker(request)
if error_message:
pytest.fail(error_message)
assert sample_path is not None, "Sample path must be resolved before starting the function app"
# Load .env file if it exists and validate required env vars
load_and_validate_env()
max_attempts = 3
last_error: Exception | None = None
func_process: subprocess.Popen[Any] | None = None
base_url = ""
port = 0
for _ in range(max_attempts):
port = find_available_port()
base_url = build_base_url(port)
func_process = start_function_app(sample_path, port)
try:
wait_for_function_app_ready(func_process, port)
last_error = None
break
except FunctionAppStartupError as exc:
last_error = exc
cleanup_function_app(func_process)
func_process = None
if func_process is None:
error_message = f"Function app failed to start after {max_attempts} attempt(s)."
if last_error is not None:
error_message += f" Last error: {last_error}"
pytest.fail(error_message)
try:
yield {"base_url": base_url, "port": port}
finally:
if func_process is not None:
cleanup_function_app(func_process)
@pytest.fixture(scope="module")
def base_url(function_app_for_test: Mapping[str, int | str]) -> str:
"""Expose the function app's base URL to tests."""
return str(function_app_for_test["base_url"])
@@ -0,0 +1,116 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Integration Tests for Single Agent Sample
Tests the single agent sample with various message formats and session management.
The function app is automatically started by the test fixture.
Prerequisites:
- Azure OpenAI credentials configured (see packages/azurefunctions/tests/integration_tests/.env.example)
- Azurite or Azure Storage account configured
Usage:
uv run pytest packages/azurefunctions/tests/integration_tests/test_01_single_agent.py -v
"""
import pytest
from .testutils import SampleTestHelper, skip_if_azure_functions_integration_tests_disabled
# Module-level markers - applied to all tests in this file
pytestmark = [
pytest.mark.sample("01_single_agent"),
pytest.mark.usefixtures("function_app_for_test"),
skip_if_azure_functions_integration_tests_disabled,
]
class TestSampleSingleAgent:
"""Tests for 01_single_agent sample."""
@pytest.fixture(autouse=True)
def _set_base_url(self, base_url: str) -> None:
"""Provide agent-specific base URL for the tests."""
self.base_url = f"{base_url}/api/agents/Joker"
def test_health_check(self, base_url: str) -> None:
"""Test health check endpoint."""
response = SampleTestHelper.get(f"{base_url}/api/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
def test_simple_message_json(self) -> None:
"""Test sending a simple message with JSON payload."""
response = SampleTestHelper.post_json(
f"{self.base_url}/run",
{"message": "Tell me a short joke about cloud computing.", "sessionId": "test-simple-json"},
)
# Agent can return 200 (immediate) or 202 (async with wait_for_completion=false)
assert response.status_code in [200, 202]
data = response.json()
if response.status_code == 200:
# Synchronous response - check result directly
assert data["status"] == "success"
assert "response" in data
assert data["message_count"] >= 1
else:
# Async response - check we got correlation info
assert "correlationId" in data or "sessionId" in data
def test_simple_message_plain_text(self) -> None:
"""Test sending a message with plain text payload."""
response = SampleTestHelper.post_text(f"{self.base_url}/run", "Tell me a short joke about networking.")
assert response.status_code in [200, 202]
data = response.json()
if response.status_code == 200:
assert data["status"] == "success"
assert "response" in data
def test_session_key_in_query(self) -> None:
"""Test using sessionKey in query parameter."""
response = SampleTestHelper.post_text(
f"{self.base_url}/run?sessionKey=test-query-session", "Tell me a short joke about weather in Texas."
)
assert response.status_code in [200, 202]
data = response.json()
if response.status_code == 200:
assert data["status"] == "success"
def test_conversation_continuity(self) -> None:
"""Test conversation context is maintained across requests."""
session_id = "test-continuity"
# First message
response1 = SampleTestHelper.post_json(
f"{self.base_url}/run",
{"message": "Tell me a short joke about weather in Seattle.", "sessionId": session_id},
)
assert response1.status_code in [200, 202]
if response1.status_code == 200:
data1 = response1.json()
assert data1["message_count"] == 1
# Second message in same session
response2 = SampleTestHelper.post_json(
f"{self.base_url}/run", {"message": "What about San Francisco?", "sessionId": session_id}
)
assert response2.status_code == 200
data2 = response2.json()
assert data2["message_count"] == 2
else:
# In async mode, we can't easily test message count
# Just verify we can make multiple calls
response2 = SampleTestHelper.post_json(
f"{self.base_url}/run", {"message": "What about Texas?", "sessionId": session_id}
)
assert response2.status_code == 202
if __name__ == "__main__":
pytest.main([__file__, "-v"])
@@ -0,0 +1,59 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Integration Tests for Multi-Agent Sample
Tests the multi-agent sample with different agent endpoints.
The function app is automatically started by the test fixture.
Prerequisites:
- Azure OpenAI credentials configured (see packages/azurefunctions/tests/integration_tests/.env.example)
- Azurite or Azure Storage account configured
Usage:
uv run pytest packages/azurefunctions/tests/integration_tests/test_02_multi_agent.py -v
"""
import pytest
from .testutils import SampleTestHelper, skip_if_azure_functions_integration_tests_disabled
# Module-level markers - applied to all tests in this file
pytestmark = [
pytest.mark.sample("02_multi_agent"),
pytest.mark.usefixtures("function_app_for_test"),
skip_if_azure_functions_integration_tests_disabled,
]
class TestSampleMultiAgent:
"""Tests for 02_multi_agent sample."""
@pytest.fixture(autouse=True)
def _set_agent_urls(self, base_url: str) -> None:
"""Configure base URLs for Weather and Math agents."""
self.weather_base_url = f"{base_url}/api/agents/WeatherAgent"
self.math_base_url = f"{base_url}/api/agents/MathAgent"
def test_weather_agent(self) -> None:
"""Test WeatherAgent endpoint."""
response = SampleTestHelper.post_json(
f"{self.weather_base_url}/run", {"message": "What is the weather in Seattle?"}
)
assert response.status_code == 202
data = response.json()
assert data["status"] == "accepted"
def test_math_agent(self) -> None:
"""Test MathAgent endpoint."""
response = SampleTestHelper.post_json(
f"{self.math_base_url}/run", {"message": "Calculate a 20% tip on a $50 bill"}
)
assert response.status_code == 202
data = response.json()
assert data["status"] == "accepted"
assert "response" in data
if __name__ == "__main__":
pytest.main([__file__, "-v"])
@@ -0,0 +1,79 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Integration Tests for Callbacks Sample
Tests the callbacks sample for event tracking and management.
The function app is automatically started by the test fixture.
Prerequisites:
- Azure OpenAI credentials configured (see packages/azurefunctions/tests/integration_tests/.env.example)
- Azurite or Azure Storage account configured
Usage:
uv run pytest packages/azurefunctions/tests/integration_tests/test_03_callbacks.py -v
"""
import pytest
import requests
from .testutils import (
TIMEOUT,
SampleTestHelper,
skip_if_azure_functions_integration_tests_disabled,
)
# Module-level markers - applied to all tests in this file
pytestmark = [
pytest.mark.sample("03_callbacks"),
pytest.mark.usefixtures("function_app_for_test"),
skip_if_azure_functions_integration_tests_disabled,
]
class TestSampleCallbacks:
"""Tests for 03_callbacks sample."""
@pytest.fixture(autouse=True)
def _set_base_url(self, base_url: str) -> None:
"""Provide the callback agent base URL for each test."""
self.base_url = f"{base_url}/api/agents/CallbackAgent"
def test_agent_with_callbacks(self) -> None:
"""Test agent execution with callback tracking."""
conversation_id = "test-callback"
response = SampleTestHelper.post_json(
f"{self.base_url}/run", {"message": "Tell me about Python", "conversationId": conversation_id}
)
assert response.status_code == 202
data = response.json()
assert data["status"] == "accepted"
def test_get_callbacks(self) -> None:
"""Test retrieving callback events."""
conversation_id = "test-callback-retrieve"
# Send a message first
SampleTestHelper.post_json(f"{self.base_url}/run", {"message": "Hello", "conversationId": conversation_id})
# Get callbacks
response = SampleTestHelper.get(f"{self.base_url}/callbacks/{conversation_id}")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
def test_delete_callbacks(self) -> None:
"""Test clearing callback events."""
conversation_id = "test-callback-delete"
# Send a message first
SampleTestHelper.post_json(f"{self.base_url}/run", {"message": "Test", "conversationId": conversation_id})
# Delete callbacks
response = requests.delete(f"{self.base_url}/callbacks/{conversation_id}", timeout=TIMEOUT)
assert response.status_code == 204
if __name__ == "__main__":
pytest.main([__file__, "-v"])
@@ -0,0 +1,53 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Integration Tests for Orchestration Chaining Sample
Tests the orchestration chaining sample for sequential agent execution.
The function app is automatically started by the test fixture.
Prerequisites:
- Azure OpenAI credentials configured (see packages/azurefunctions/tests/integration_tests/.env.example)
- Azurite running for durable orchestrations (or Azure Storage account configured)
Usage:
# Start Azurite (if not already running)
azurite &
# Run tests
uv run pytest packages/azurefunctions/tests/integration_tests/test_04_single_agent_orchestration_chaining.py -v
"""
import pytest
from .testutils import SampleTestHelper, skip_if_azure_functions_integration_tests_disabled
# Module-level markers - applied to all tests in this file
pytestmark = [
pytest.mark.sample("04_single_agent_orchestration_chaining"),
pytest.mark.usefixtures("function_app_for_test"),
skip_if_azure_functions_integration_tests_disabled,
]
@pytest.mark.orchestration
class TestSampleOrchestrationChaining:
"""Tests for 04_single_agent_orchestration_chaining sample."""
def test_orchestration_chaining(self, base_url: str) -> None:
"""Test sequential agent calls in orchestration."""
# Start orchestration
response = SampleTestHelper.post_json(f"{base_url}/api/singleagent/run", {})
assert response.status_code == 202
data = response.json()
assert "instanceId" in data
assert "statusQueryGetUri" in data
# Wait for completion with output available
status = SampleTestHelper.wait_for_orchestration_with_output(data["statusQueryGetUri"])
assert status["runtimeStatus"] == "Completed"
assert "output" in status
if __name__ == "__main__":
pytest.main([__file__, "-v"])
@@ -0,0 +1,55 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Integration Tests for MultiAgent Concurrency Sample
Tests the multi-agent concurrency sample for parallel agent execution.
The function app is automatically started by the test fixture.
Prerequisites:
- Azure OpenAI credentials configured (see packages/azurefunctions/tests/integration_tests/.env.example)
- Azurite running for durable orchestrations (or Azure Storage account configured)
Usage:
# Start Azurite (if not already running)
azurite &
# Run tests
uv run pytest packages/azurefunctions/tests/integration_tests/test_05_multi_agent_orchestration_concurrency.py -v
"""
import pytest
from .testutils import SampleTestHelper, skip_if_azure_functions_integration_tests_disabled
# Module-level markers - applied to all tests in this file
pytestmark = [
pytest.mark.orchestration,
pytest.mark.sample("05_multi_agent_orchestration_concurrency"),
pytest.mark.usefixtures("function_app_for_test"),
skip_if_azure_functions_integration_tests_disabled,
]
class TestSampleMultiAgentConcurrency:
"""Tests for 05_multi_agent_orchestration_concurrency sample."""
def test_concurrent_agents(self, base_url: str) -> None:
"""Test multiple agents running concurrently."""
# Start orchestration
response = SampleTestHelper.post_text(f"{base_url}/api/multiagent/run", "What is temperature?")
assert response.status_code == 202
data = response.json()
assert "instanceId" in data
assert "statusQueryGetUri" in data
# Wait for completion
status = SampleTestHelper.wait_for_orchestration(data["statusQueryGetUri"])
assert status["runtimeStatus"] == "Completed"
output = status["output"]
assert "physicist" in output
assert "chemist" in output
if __name__ == "__main__":
pytest.main([__file__, "-v"])
@@ -0,0 +1,73 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Integration Tests for MultiAgent Conditionals Sample
Tests the multi-agent conditionals sample for conditional orchestration logic.
The function app is automatically started by the test fixture.
Prerequisites:
- Azure OpenAI credentials configured (see packages/azurefunctions/tests/integration_tests/.env.example)
- Azurite running for durable orchestrations (or Azure Storage account configured)
Usage:
# Start Azurite (if not already running)
azurite &
# Run tests
uv run pytest packages/azurefunctions/tests/integration_tests/test_06_multi_agent_orchestration_conditionals.py -v
"""
import pytest
from .testutils import SampleTestHelper, skip_if_azure_functions_integration_tests_disabled
# Module-level markers - applied to all tests in this file
pytestmark = [
pytest.mark.orchestration,
pytest.mark.sample("06_multi_agent_orchestration_conditionals"),
pytest.mark.usefixtures("function_app_for_test"),
skip_if_azure_functions_integration_tests_disabled,
]
class TestSampleMultiAgentConditionals:
"""Tests for 06_multi_agent_orchestration_conditionals sample."""
def test_legitimate_email(self, base_url: str) -> None:
"""Test conditional logic with legitimate email."""
response = SampleTestHelper.post_json(
f"{base_url}/api/spamdetection/run",
{
"email_id": "email-test-001",
"email_content": "Hi John, I hope you are doing well. Can you send me the report?",
},
)
assert response.status_code == 202
data = response.json()
assert "instanceId" in data
assert "statusQueryGetUri" in data
# Wait for completion
status = SampleTestHelper.wait_for_orchestration(data["statusQueryGetUri"])
assert status["runtimeStatus"] == "Completed"
assert "Email sent:" in status["output"]
def test_spam_email(self, base_url: str) -> None:
"""Test conditional logic with spam email."""
response = SampleTestHelper.post_json(
f"{base_url}/api/spamdetection/run",
{"email_id": "email-test-002", "email_content": "URGENT! You have won $1,000,000! Click here now!"},
)
assert response.status_code == 202
data = response.json()
assert "instanceId" in data
# Wait for completion
status = SampleTestHelper.wait_for_orchestration(data["statusQueryGetUri"])
assert status["runtimeStatus"] == "Completed"
assert "Email marked as spam:" in status["output"]
if __name__ == "__main__":
pytest.main([__file__, "-v"])
@@ -0,0 +1,185 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Integration Tests for Human-in-the-Loop (HITL) Orchestration Sample
Tests the HITL orchestration sample for content generation with human approval workflow.
The function app is automatically started by the test fixture.
Prerequisites:
- Azure OpenAI credentials configured (see packages/azurefunctions/tests/integration_tests/.env.example)
- Azurite running for durable orchestrations (or Azure Storage account configured)
Usage:
# Start Azurite (if not already running)
azurite &
# Run tests
uv run pytest packages/azurefunctions/tests/integration_tests/test_07_single_agent_orchestration_hitl.py -v
"""
import time
import pytest
from .testutils import SampleTestHelper, skip_if_azure_functions_integration_tests_disabled
# Module-level markers - applied to all tests in this file
pytestmark = [
pytest.mark.sample("07_single_agent_orchestration_hitl"),
pytest.mark.usefixtures("function_app_for_test"),
skip_if_azure_functions_integration_tests_disabled,
]
@pytest.mark.orchestration
class TestSampleHITLOrchestration:
"""Tests for 07_single_agent_orchestration_hitl sample."""
@pytest.fixture(autouse=True)
def _set_hitl_base_url(self, base_url: str) -> None:
"""Prepare the HITL API base URL for the module's tests."""
self.hitl_base_url = f"{base_url}/api/hitl"
def test_hitl_orchestration_approval(self) -> None:
"""Test HITL orchestration with human approval."""
# Start orchestration
response = SampleTestHelper.post_json(
f"{self.hitl_base_url}/run",
{"topic": "artificial intelligence", "max_review_attempts": 3, "approval_timeout_hours": 1.0},
)
assert response.status_code == 202
data = response.json()
assert "instanceId" in data
assert "statusQueryGetUri" in data
assert data["topic"] == "artificial intelligence"
instance_id = data["instanceId"]
# Wait a bit for the orchestration to generate initial content
time.sleep(5)
# Check status to ensure it's waiting for approval
status_response = SampleTestHelper.get(data["statusQueryGetUri"])
assert status_response.status_code == 200
status = status_response.json()
assert status["runtimeStatus"] in ["Running", "Pending"]
# Send approval
approval_response = SampleTestHelper.post_json(
f"{self.hitl_base_url}/approve/{instance_id}", {"approved": True, "feedback": ""}
)
assert approval_response.status_code == 200
approval_data = approval_response.json()
assert approval_data["approved"] is True
# Wait for orchestration to complete
status = SampleTestHelper.wait_for_orchestration(data["statusQueryGetUri"])
assert status["runtimeStatus"] == "Completed"
assert "output" in status
assert "content" in status["output"]
def test_hitl_orchestration_rejection_with_feedback(self) -> None:
"""Test HITL orchestration with rejection and subsequent approval."""
# Start orchestration
response = SampleTestHelper.post_json(
f"{self.hitl_base_url}/run",
{"topic": "machine learning", "max_review_attempts": 3, "approval_timeout_hours": 1.0},
)
assert response.status_code == 202
data = response.json()
instance_id = data["instanceId"]
# Wait for initial content generation
time.sleep(5)
# Send rejection with feedback
rejection_response = SampleTestHelper.post_json(
f"{self.hitl_base_url}/approve/{instance_id}",
{"approved": False, "feedback": "Please make it more concise and focus on practical applications."},
)
assert rejection_response.status_code == 200
# Wait for regeneration
time.sleep(5)
# Check status - should still be running
status_response = SampleTestHelper.get(data["statusQueryGetUri"])
assert status_response.status_code == 200
status = status_response.json()
assert status["runtimeStatus"] in ["Running", "Pending"]
# Now approve the revised content
approval_response = SampleTestHelper.post_json(
f"{self.hitl_base_url}/approve/{instance_id}", {"approved": True, "feedback": ""}
)
assert approval_response.status_code == 200
# Wait for completion
status = SampleTestHelper.wait_for_orchestration(data["statusQueryGetUri"])
assert status["runtimeStatus"] == "Completed"
assert "output" in status
def test_hitl_orchestration_missing_topic(self) -> None:
"""Test HITL orchestration with missing topic."""
response = SampleTestHelper.post_json(f"{self.hitl_base_url}/run", {"max_review_attempts": 3})
assert response.status_code == 400
data = response.json()
assert "error" in data
def test_hitl_get_status(self) -> None:
"""Test getting orchestration status."""
# Start orchestration
response = SampleTestHelper.post_json(
f"{self.hitl_base_url}/run",
{"topic": "quantum computing", "max_review_attempts": 2, "approval_timeout_hours": 1.0},
)
assert response.status_code == 202
data = response.json()
instance_id = data["instanceId"]
# Get status
status_response = SampleTestHelper.get(f"{self.hitl_base_url}/status/{instance_id}")
assert status_response.status_code == 200
status = status_response.json()
assert "instanceId" in status
assert "runtimeStatus" in status
assert status["instanceId"] == instance_id
# Cleanup: approve to complete orchestration
time.sleep(5)
SampleTestHelper.post_json(f"{self.hitl_base_url}/approve/{instance_id}", {"approved": True, "feedback": ""})
def test_hitl_approval_invalid_payload(self) -> None:
"""Test sending approval with invalid payload."""
# Start orchestration first
response = SampleTestHelper.post_json(
f"{self.hitl_base_url}/run",
{"topic": "test topic", "max_review_attempts": 1, "approval_timeout_hours": 1.0},
)
assert response.status_code == 202
data = response.json()
instance_id = data["instanceId"]
time.sleep(3)
# Send approval without 'approved' field
approval_response = SampleTestHelper.post_json(
f"{self.hitl_base_url}/approve/{instance_id}", {"feedback": "Some feedback"}
)
assert approval_response.status_code == 400
error_data = approval_response.json()
assert "error" in error_data
# Cleanup
SampleTestHelper.post_json(f"{self.hitl_base_url}/approve/{instance_id}", {"approved": True, "feedback": ""})
def test_hitl_status_invalid_instance(self) -> None:
"""Test getting status for non-existent instance."""
response = SampleTestHelper.get(f"{self.hitl_base_url}/status/invalid-instance-id")
assert response.status_code == 404
data = response.json()
assert "error" in data
if __name__ == "__main__":
pytest.main([__file__, "-v"])
@@ -0,0 +1,401 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Shared test helper utilities for sample integration tests.
This module provides common utilities for testing Azure Functions samples.
"""
import os
import socket
import subprocess
import sys
import time
import uuid
from contextlib import suppress
from pathlib import Path
from typing import Any
import pytest
import requests
# Configuration
TIMEOUT = 30 # seconds
ORCHESTRATION_TIMEOUT = 180 # seconds for orchestrations
_DEFAULT_HOST = "localhost"
class FunctionAppStartupError(RuntimeError):
"""Raised when the Azure Functions host fails to start reliably."""
pass
def _load_env_file_if_present() -> None:
"""Load environment variables from the local .env file when available."""
env_file = Path(__file__).parent / ".env"
if not env_file.exists():
return
try:
from dotenv import load_dotenv
load_dotenv(env_file)
except ImportError:
# python-dotenv not available; rely on existing environment
pass
def _should_skip_azure_functions_integration_tests() -> tuple[bool, str]:
"""Determine whether Azure Functions integration tests should be skipped."""
_load_env_file_if_present()
run_integration_tests = os.getenv("RUN_INTEGRATION_TESTS", "false").lower() == "true"
if not run_integration_tests:
return (
True,
"Integration tests are disabled. Set RUN_INTEGRATION_TESTS=true to enable Azure Functions sample tests.",
)
endpoint = os.getenv("AZURE_OPENAI_ENDPOINT", "").strip()
if not endpoint or endpoint == "https://your-resource.openai.azure.com/":
return True, "No real AZURE_OPENAI_ENDPOINT provided; skipping integration tests."
deployment_name = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME", "").strip()
if not deployment_name or deployment_name == "your-deployment-name":
return True, "No real AZURE_OPENAI_CHAT_DEPLOYMENT_NAME provided; skipping integration tests."
api_key = os.getenv("AZURE_OPENAI_API_KEY", "").strip()
if not api_key or api_key == "your-api-key-here":
return True, "No real AZURE_OPENAI_API_KEY provided; skipping integration tests."
return False, "Integration tests enabled."
_SKIP_AZURE_FUNCTIONS_INTEGRATION_TESTS, _AZURE_FUNCTIONS_SKIP_REASON = _should_skip_azure_functions_integration_tests()
skip_if_azure_functions_integration_tests_disabled = pytest.mark.skipif(
_SKIP_AZURE_FUNCTIONS_INTEGRATION_TESTS,
reason=_AZURE_FUNCTIONS_SKIP_REASON,
)
class SampleTestHelper:
"""Helper class for testing samples."""
@staticmethod
def post_json(url: str, data: dict[str, Any], timeout: int = TIMEOUT) -> requests.Response:
"""POST JSON data to a URL."""
return requests.post(url, json=data, headers={"Content-Type": "application/json"}, timeout=timeout)
@staticmethod
def post_text(url: str, text: str, timeout: int = TIMEOUT) -> requests.Response:
"""POST plain text to a URL."""
return requests.post(url, data=text, headers={"Content-Type": "text/plain"}, timeout=timeout)
@staticmethod
def get(url: str, timeout: int = TIMEOUT) -> requests.Response:
"""GET request to a URL."""
return requests.get(url, timeout=timeout)
@staticmethod
def wait_for_orchestration(
status_url: str, max_wait: int = ORCHESTRATION_TIMEOUT, poll_interval: int = 2
) -> dict[str, Any]:
"""
Wait for an orchestration to complete.
Args:
status_url: URL to poll for orchestration status
max_wait: Maximum seconds to wait
poll_interval: Seconds between polls
Returns:
Final orchestration status
Raises:
TimeoutError: If orchestration doesn't complete in time
"""
start_time = time.time()
while time.time() - start_time < max_wait:
response = requests.get(status_url, timeout=TIMEOUT)
response.raise_for_status()
status = response.json()
runtime_status = status.get("runtimeStatus", "")
if runtime_status in ["Completed", "Failed", "Terminated"]:
return status
time.sleep(poll_interval)
raise TimeoutError(f"Orchestration did not complete within {max_wait} seconds")
@staticmethod
def wait_for_orchestration_with_output(
status_url: str, max_wait: int = ORCHESTRATION_TIMEOUT, poll_interval: int = 2
) -> dict[str, Any]:
"""
Wait for an orchestration to complete and have output available.
This is a specialized version of wait_for_orchestration that also
ensures the output field is present, handling timing race conditions.
Args:
status_url: URL to poll for orchestration status
max_wait: Maximum seconds to wait
poll_interval: Seconds between polls
Returns:
Final orchestration status with output
Raises:
TimeoutError: If orchestration doesn't complete with output in time
"""
start_time = time.time()
while time.time() - start_time < max_wait:
response = requests.get(status_url, timeout=TIMEOUT)
response.raise_for_status()
status = response.json()
runtime_status = status.get("runtimeStatus", "")
if runtime_status in ["Failed", "Terminated"]:
return status
if runtime_status == "Completed" and status.get("output"):
return status
# If completed but no output, continue polling for a bit more to
# handle the race condition where output has not been persisted yet.
time.sleep(poll_interval)
# Provide detailed error message based on final status
final_response = requests.get(status_url, timeout=TIMEOUT)
final_response.raise_for_status()
final_status = final_response.json()
final_runtime_status = final_status.get("runtimeStatus", "Unknown")
if final_runtime_status == "Completed":
if "output" not in final_status:
raise TimeoutError(
"Orchestration completed but 'output' field is missing after "
f"{max_wait} seconds. Final status: {final_status}"
)
if not final_status["output"]:
raise TimeoutError(
"Orchestration completed but output is empty after "
f"{max_wait} seconds. Final status: {final_status}"
)
raise TimeoutError(
"Orchestration completed with output but validation failed after "
f"{max_wait} seconds. Final status: {final_status}"
)
raise TimeoutError(
"Orchestration did not complete within "
f"{max_wait} seconds. Final status: {final_runtime_status}, "
f"Full status: {final_status}"
)
# Function App Lifecycle Management Helpers
def _resolve_repo_root() -> Path:
"""Resolve the repository root, preferring GITHUB_WORKSPACE when available."""
workspace = os.getenv("GITHUB_WORKSPACE")
if workspace:
candidate = Path(workspace).expanduser()
if not (candidate / "samples").exists() and (candidate / "python" / "samples").exists():
return (candidate / "python").resolve()
return candidate.resolve()
# If `GITHUB_WORKSPACE` is not set,
# go up from testutils.py -> integration_tests -> tests -> azurefunctions -> packages -> python
return Path(__file__).resolve().parents[4]
def get_sample_path_from_marker(request) -> tuple[Path | None, str | None]:
"""
Get sample path from @pytest.mark.sample() marker.
Returns a tuple of (sample_path, error_message).
If successful, error_message is None.
If failed, sample_path is None and error_message contains the reason.
"""
marker = request.node.get_closest_marker("sample")
if not marker:
return (
None,
(
"No @pytest.mark.sample() marker found on test. Add pytestmark with "
"@pytest.mark.sample('sample_name') to the test module."
),
)
if not marker.args:
return (
None,
"@pytest.mark.sample() marker found but no sample name provided. Use @pytest.mark.sample('sample_name').",
)
sample_name = marker.args[0]
repo_root = _resolve_repo_root()
sample_path = repo_root / "samples" / "getting_started" / "azure_functions" / sample_name
if not sample_path.exists():
return None, f"Sample directory does not exist: {sample_path}"
return sample_path, None
def find_available_port(host: str = _DEFAULT_HOST) -> int:
"""Find an available TCP port on the given host."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind((host, 0))
return sock.getsockname()[1]
def build_base_url(port: int, host: str = _DEFAULT_HOST) -> str:
"""Construct a base URL for the Azure Functions host."""
return f"http://{host}:{port}"
def is_port_in_use(port: int, host: str = _DEFAULT_HOST) -> bool:
"""
Check if a port is already in use.
Returns True if the port is in use, False otherwise.
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
return sock.connect_ex((host, port)) == 0
def load_and_validate_env() -> None:
"""
Load .env file from current directory if it exists,
then validate that required environment variables are present.
Raises pytest.fail if required environment variables are missing.
"""
_load_env_file_if_present()
# Required environment variables for Azure Functions samples
# These match the variables defined in .env.example
required_env_vars = [
"AZURE_OPENAI_ENDPOINT",
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME",
"AzureWebJobsStorage",
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING",
"FUNCTIONS_WORKER_RUNTIME",
]
# Check if required env vars are set
missing_vars = [var for var in required_env_vars if not os.environ.get(var)]
if missing_vars:
pytest.fail(
f"Missing required environment variables: {', '.join(missing_vars)}. "
"Please create a .env file in tests/integration_tests/ based on .env.example or "
"set these variables in your environment."
)
def start_function_app(sample_path: Path, port: int) -> subprocess.Popen:
"""
Start a function app in the specified sample directory.
Returns the subprocess.Popen object for the running process.
"""
env = os.environ.copy()
# Use a unique TASKHUB_NAME for each test run to ensure test isolation.
# This prevents conflicts between parallel or repeated test runs, as Durable Functions
# use the task hub name to separate orchestration state.
env["TASKHUB_NAME"] = f"test{uuid.uuid4().hex[:8]}"
# On Windows, use CREATE_NEW_PROCESS_GROUP to allow proper termination
# shell=True only on Windows to handle PATH resolution
if sys.platform == "win32":
return subprocess.Popen(
["func", "start", "--port", str(port)],
cwd=str(sample_path),
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
shell=True,
env=env,
)
# On Unix, don't use shell=True to avoid shell wrapper issues
return subprocess.Popen(["func", "start", "--port", str(port)], cwd=str(sample_path), env=env)
def wait_for_function_app_ready(func_process: subprocess.Popen, port: int, max_wait: int = 60) -> None:
"""Block until the Azure Functions host responds healthy or fail fast."""
start_time = time.time()
health_url = f"{build_base_url(port)}/api/health"
last_error: Exception | None = None
while time.time() - start_time < max_wait:
# If the process exited early, capture any previously seen error and fail fast.
if func_process.poll() is not None:
raise FunctionAppStartupError(
f"Function app process exited with code {func_process.returncode} before becoming healthy"
) from last_error
if is_port_in_use(port):
try:
response = requests.get(health_url, timeout=5)
if response.status_code == 200:
return
last_error = RuntimeError(f"Health check returned {response.status_code}")
except requests.RequestException as exc:
last_error = exc
time.sleep(1)
raise FunctionAppStartupError(
f"Function app did not become healthy on port {port} within {max_wait} seconds"
) from last_error
def cleanup_function_app(func_process: subprocess.Popen) -> None:
"""
Clean up the function app process and all its children.
Uses psutil if available for more thorough cleanup, falls back to basic termination.
"""
try:
import psutil
if func_process.poll() is None: # Process still running
# Get parent process
parent = psutil.Process(func_process.pid)
# Get all child processes recursively
children = parent.children(recursive=True)
# Kill children first
for child in children:
with suppress(psutil.NoSuchProcess, psutil.AccessDenied):
child.kill()
# Kill parent
with suppress(psutil.NoSuchProcess, psutil.AccessDenied):
parent.kill()
# Wait for all to terminate
_gone, alive = psutil.wait_procs(children + [parent], timeout=3)
# Force kill any remaining
for proc in alive:
with suppress(psutil.NoSuchProcess, psutil.AccessDenied):
proc.kill()
except ImportError:
# Fallback if psutil not available
try:
if func_process.poll() is None:
func_process.kill()
func_process.wait()
except Exception:
# Ignore all exceptions during fallback cleanup; best effort to terminate process.
pass
except Exception:
pass # Best effort cleanup
# Give the port time to be released
time.sleep(2)
@@ -6,11 +6,11 @@ from typing import Any
PACKAGE_NAME = "agent_framework_azurefunctions"
PACKAGE_EXTRA = "azurefunctions"
_IMPORTS = [
"AgentCallbackContext",
"AgentFunctionApp",
"AgentResponseCallbackProtocol",
"DurableAIAgent",
"get_agent",
"AgentCallbackContext",
"AgentResponseCallbackProtocol",
]
@@ -0,0 +1,17 @@
# Copyright (c) Microsoft. All rights reserved.
from agent_framework_azurefunctions import (
AgentCallbackContext,
AgentFunctionApp,
AgentResponseCallbackProtocol,
DurableAIAgent,
get_agent,
)
__all__ = [
"AgentCallbackContext",
"AgentFunctionApp",
"AgentResponseCallbackProtocol",
"DurableAIAgent",
"get_agent",
]
+12
View File
@@ -193,6 +193,18 @@ This directory contains samples demonstrating the capabilities of Microsoft Agen
| [`getting_started/multimodal_input/azure_responses_multimodal.py`](./getting_started/multimodal_input/azure_responses_multimodal.py) | Azure OpenAI Responses with multimodal (image) input example |
| [`getting_started/multimodal_input/openai_chat_multimodal.py`](./getting_started/multimodal_input/openai_chat_multimodal.py) | OpenAI Chat with multimodal (image) input example |
## Azure Functions
| Sample | Description |
|--------|-------------|
| [`getting_started/azure_functions/01_single_agent/`](./getting_started/azure_functions/01_single_agent/) | Host a single agent in Azure Functions with Durable Extension HTTP endpoints and per-session state. |
| [`getting_started/azure_functions/02_multi_agent/`](./getting_started/azure_functions/02_multi_agent/) | Register multiple agents in one function app with dedicated run routes and a health check endpoint. |
| [`getting_started/azure_functions/03_callbacks/`](./getting_started/azure_functions/03_callbacks/) | Capture streaming response telemetry via Durable Extension callbacks exposed through HTTP APIs. |
| [`getting_started/azure_functions/04_single_agent_orchestration_chaining/`](./getting_started/azure_functions/04_single_agent_orchestration_chaining/) | Chain sequential agent executions inside a durable orchestration while preserving the shared thread context. |
| [`getting_started/azure_functions/05_multi_agent_orchestration_concurrency/`](./getting_started/azure_functions/05_multi_agent_orchestration_concurrency/) | Run two agents concurrently within a durable orchestration and combine their domain-specific outputs. |
| [`getting_started/azure_functions/06_multi_agent_orchestration_conditionals/`](./getting_started/azure_functions/06_multi_agent_orchestration_conditionals/) | Route orchestration logic based on structured agent responses for spam detection and reply drafting. |
| [`getting_started/azure_functions/07_single_agent_orchestration_hitl/`](./getting_started/azure_functions/07_single_agent_orchestration_hitl/) | Implement a human-in-the-loop approval loop that iterates on agent output inside a durable orchestration. |
## Observability
| File | Description |
@@ -28,11 +28,10 @@ source .venv/bin/activate
### 2. Install dependencies
- Azure Functions Core Tools 4.x install from the official docs so you can run `func start` locally.
- Azurite storage emulator the sample uses `AzureWebJobsStorage=UseDevelopmentStorage=true`; start Azurite before launching the app.
- Durable Task local backend `DURABLE_TASK_SCHEDULER_CONNECTION_STRING` expects the Durable Task scheduler listening on `http://localhost:8080` (start the Durable Functions emulator if it is not already running).
- [Azure Functions Core Tools 4.x](https://learn.microsoft.com/azure/azure-functions/functions-run-local?tabs=windows%2Cpython%2Cv2&pivots=programming-language-python#install-the-azure-functions-core-tools) install so you can run `func start` locally.
- [Azurite storage emulator](https://learn.microsoft.com/azure/storage/common/storage-use-azurite?tabs=visual-studio) install and start Azurite before launching the app (the sample uses `AzureWebJobsStorage=UseDevelopmentStorage=true`).
- Python dependencies from this folder, run `pip install -r requirements.txt` (or the equivalent in your active virtual environment).
- Copy `local.settings.json.template` to `local.settings.json` and update the values for `AZURE_OPENAI_ENDPOINT` and `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME` (and optionally `AZURE_OPENAI_API_KEY`) with your Azure OpenAI resource details; keep the other values as provided unless you are using custom infrastructure.
- Copy `local.settings.json.template` to `local.settings.json`, then update `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, and `AZURE_OPENAI_API_KEY` so the Azure OpenAI SDK can authenticate; keep `TASKHUB_NAME` set to `default` unless you plan to change the durable task hub name.
## Running the Sample
@@ -14,13 +14,11 @@ Content-Type: application/json
{
"message": "Add a security element to it.",
"sessionId": "session-003",
"waitForCompletion": false
"waitForCompletion": true
}
### Ask for a joke (plain text payload)
POST {{agentRoute}}/run
x-wait-for-completion: true
Give me a programming joke about race conditions.
### Retrieve conversation state
GET {{agentRoute}}/session-001
Give me a programming joke about race conditions.
@@ -22,7 +22,7 @@ def _create_agent() -> Any:
# 2. Register the agent with AgentFunctionApp so Azure Functions exposes the required triggers.
app = AgentFunctionApp(agents=[_create_agent()], enable_health_check=True)
app = AgentFunctionApp(agents=[_create_agent()], enable_health_check=True, max_poll_retries=50)
"""
Expected output when invoking `POST /api/agents/Joker/run` with plain-text input:
@@ -3,5 +3,10 @@
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
},
"extensions": {
"durableTask": {
"hubName": "%TASKHUB_NAME%"
}
}
}
@@ -4,7 +4,9 @@
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None",
"TASKHUB_NAME": "default",
"AZURE_OPENAI_ENDPOINT": "<AZURE_OPENAI_ENDPOINT>",
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "<AZURE_OPENAI_CHAT_DEPLOYMENT_NAME>"
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "<AZURE_OPENAI_CHAT_DEPLOYMENT_NAME>",
"AZURE_OPENAI_API_KEY": "<AZURE_OPENAI_API_KEY>"
}
}
@@ -1,2 +1,3 @@
agent-framework-azurefunctions
azure-identity
azure-identity
packaging
@@ -27,11 +27,13 @@ source .venv/bin/activate
### 2. Install dependencies
See the [README.md](../README.md) file in the parent directory for more information on how to configure the environment, including how to install and run common sample dependencies.
- [Azure Functions Core Tools 4.x](https://learn.microsoft.com/azure/azure-functions/functions-run-local?tabs=windows%2Cpython%2Cv2&pivots=programming-language-python#install-the-azure-functions-core-tools) install so you can run `func start` locally.
- [Azurite storage emulator](https://learn.microsoft.com/azure/storage/common/storage-use-azurite?tabs=visual-studio) install and start Azurite before launching the app; the sample expects `AzureWebJobsStorage=UseDevelopmentStorage=true`.
- Python dependencies from this folder, run `pip install -r requirements.txt` (or use the equivalent command in your active virtual environment).
### 3. Configure local settings
Copy `local.settings.json.template` to `local.settings.json`, then set the Azure OpenAI values (`AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, and optionally `AZURE_OPENAI_API_KEY`) to match your environment.
- Copy `local.settings.json.template` to `local.settings.json`, then set the Azure OpenAI values (`AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, and `AZURE_OPENAI_API_KEY`) so the SDK can authenticate, and keep `TASKHUB_NAME` set to `default` unless you intend to change the durable task hub name.
## Running the Sample
@@ -51,7 +51,8 @@ Content-Type: application/json
{
"message": "Calculate a 20% tip on a $50 bill",
"sessionId": "math-user-001"
"sessionId": "math-user-001",
"waitForCompletion": true
}
###
@@ -67,7 +67,7 @@ math_agent = chat_client.create_agent(
# 2. Register both agents with AgentFunctionApp to expose their HTTP routes and health check.
app = AgentFunctionApp(agents=[weather_agent, math_agent], enable_health_check=True)
app = AgentFunctionApp(agents=[weather_agent, math_agent], enable_health_check=True, max_poll_retries=50)
# Option 2: Add agents after initialization (commented out as we're using Option 1)
# app = AgentFunctionApp(enable_health_check=True)
@@ -11,5 +11,10 @@
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
},
"extensions": {
"durableTask": {
"hubName": "%TASKHUB_NAME%"
}
}
}
@@ -4,7 +4,9 @@
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None",
"TASKHUB_NAME": "default",
"AZURE_OPENAI_ENDPOINT": "<AZURE_OPENAI_ENDPOINT>",
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "<AZURE_OPENAI_CHAT_DEPLOYMENT_NAME>"
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "<AZURE_OPENAI_CHAT_DEPLOYMENT_NAME>",
"AZURE_OPENAI_API_KEY": "<AZURE_OPENAI_API_KEY>"
}
}
@@ -1,2 +1,3 @@
agent-framework-azurefunctions
packaging
azure-identity
@@ -17,10 +17,10 @@ an HTTP API that can be polled by a web client or dashboard.
## Prerequisites
- Python 3.11+
- Azure Functions Core Tools v4
- Access to an Azure OpenAI deployment (configure the environment variables listed in
`local.settings.json` or export them in your shell)
- Python 3.10+
- [Azure Functions Core Tools 4.x](https://learn.microsoft.com/azure/azure-functions/functions-run-local?tabs=windows%2Cpython%2Cv2&pivots=programming-language-python#install-the-azure-functions-core-tools)
- [Azurite storage emulator](https://learn.microsoft.com/azure/storage/common/storage-use-azurite?tabs=visual-studio) running locally so the sample can use `AzureWebJobsStorage=UseDevelopmentStorage=true`
- Access to an Azure OpenAI deployment with `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, and `AZURE_OPENAI_API_KEY` configured (either in `local.settings.json` or exported in your shell)
- Dependencies from `requirements.txt` installed in your environment
> **Note:** The sample stores callback events in memory for simplicity. For production scenarios you
@@ -48,7 +48,7 @@ an HTTP API that can be polled by a web client or dashboard.
pip install -r requirements.txt
```
3. Copy `local.settings.json.template` to `local.settings.json` and update the values (or export them as environment variables) with your Azure resources.
3. Copy `local.settings.json.template` to `local.settings.json` and update `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, and `AZURE_OPENAI_API_KEY` (or export them as environment variables) for your Azure resources, making sure `TASKHUB_NAME` remains `default` unless you have changed the durable task hub name.
4. Start the Functions host:
@@ -11,5 +11,10 @@
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
},
"extensions": {
"durableTask": {
"hubName": "%TASKHUB_NAME%"
}
}
}
@@ -4,7 +4,9 @@
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None",
"TASKHUB_NAME": "default",
"AZURE_OPENAI_ENDPOINT": "<AZURE_OPENAI_ENDPOINT>",
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "<AZURE_OPENAI_CHAT_DEPLOYMENT_NAME>"
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "<AZURE_OPENAI_CHAT_DEPLOYMENT_NAME>",
"AZURE_OPENAI_API_KEY": "<AZURE_OPENAI_API_KEY>"
}
}
@@ -1,2 +1,3 @@
agent-framework-azurefunctions
azure-identity
azure-identity
packaging
@@ -9,15 +9,16 @@ preserving the conversation state between runs.
- HTTP endpoints for starting the orchestration and polling for status/output
## Prerequisites
- Python 3.11+
- Azure Functions Core Tools v4
- Local Azure Storage / Azurite and the Durable Task sidecar running
- Python 3.10+
- [Azure Functions Core Tools 4.x](https://learn.microsoft.com/azure/azure-functions/functions-run-local?tabs=windows%2Cpython%2Cv2&pivots=programming-language-python#install-the-azure-functions-core-tools)
- [Azurite storage emulator](https://learn.microsoft.com/azure/storage/common/storage-use-azurite?tabs=visual-studio) running locally so the sample can use `AzureWebJobsStorage=UseDevelopmentStorage=true`
- Environment variables configured:
- `AZURE_OPENAI_ENDPOINT`
- `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`
- `AZURE_OPENAI_API_KEY` (omit if using Azure CLI authentication)
- Copy `local.settings.json.template` to `local.settings.json` and populate those keys (and any storage settings) before running the Functions host.
- Dependencies installed: `pip install -r requirements.txt`
- `AZURE_OPENAI_API_KEY` (required for key-based auth; ensure Azure CLI is logged in if you prefer token-based auth)
- Keep `TASKHUB_NAME` set to `default` unless you intend to change the durable task hub name.
- Copy `local.settings.json.template` to `local.settings.json` and populate those keys—including `AZURE_OPENAI_API_KEY`—along with any storage settings before running the Functions host.
- Install dependencies with `pip install -r requirements.txt`
## Running the Sample
1. Start the Functions host: `func start`.
@@ -1,23 +1,12 @@
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"maxTelemetryItemsPerSecond": 20
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle.Preview",
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
},
"extensions": {
"durableTask": {
"storageProvider": {
"type": "azureManaged",
"connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"
}
"hubName": "%TASKHUB_NAME%"
}
}
}
@@ -4,7 +4,9 @@
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None",
"TASKHUB_NAME": "default",
"AZURE_OPENAI_ENDPOINT": "<AZURE_OPENAI_ENDPOINT>",
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "<AZURE_OPENAI_CHAT_DEPLOYMENT_NAME>"
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "<AZURE_OPENAI_CHAT_DEPLOYMENT_NAME>",
"AZURE_OPENAI_API_KEY": "<AZURE_OPENAI_API_KEY>"
}
}
@@ -1,2 +1,3 @@
agent-framework-azurefunctions
azure-identity
packaging
@@ -8,15 +8,16 @@ This sample starts a Durable Functions orchestration that runs two agents in par
- HTTP routes (`/api/multiagent/run` and `/api/multiagent/status/{instanceId}`) mirror the .NET sample for parity.
## Prerequisites
- Python 3.11+
- Azure Functions Core Tools v4
- Azurite / Azure Storage emulator and Durable Task sidecar running locally
- Python 3.10+
- [Azure Functions Core Tools 4.x](https://learn.microsoft.com/azure/azure-functions/functions-run-local?tabs=windows%2Cpython%2Cv2&pivots=programming-language-python#install-the-azure-functions-core-tools)
- [Azurite storage emulator](https://learn.microsoft.com/azure/storage/common/storage-use-azurite?tabs=visual-studio) running locally so the sample can use `AzureWebJobsStorage=UseDevelopmentStorage=true`
- Environment variables configured:
- `AZURE_OPENAI_ENDPOINT`
- `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`
- `AZURE_OPENAI_API_KEY` (omit when using Azure CLI auth)
- Copy `local.settings.json.template` to `local.settings.json` and fill in those Azure OpenAI values (and storage settings) before starting the Functions host.
- Install dependencies: `pip install -r requirements.txt`
- `AZURE_OPENAI_API_KEY` (required for key-based auth; ensure the Azure CLI is logged in if you rely on token-based auth)
- Keep `TASKHUB_NAME` set to `default` unless you intend to change the durable task hub name.
- Copy `local.settings.json.template` to `local.settings.json` and fill in those Azure OpenAI values—including `AZURE_OPENAI_API_KEY`—along with storage settings before starting the Functions host.
- Install dependencies with `pip install -r requirements.txt`
## Running the Sample
1. Start the Functions host: `func start`.
@@ -1,23 +1,12 @@
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"maxTelemetryItemsPerSecond": 20
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle.Preview",
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
},
"extensions": {
"durableTask": {
"storageProvider": {
"type": "azureManaged",
"connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"
}
"hubName": "%TASKHUB_NAME%"
}
}
}
@@ -4,7 +4,9 @@
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None",
"TASKHUB_NAME": "default",
"AZURE_OPENAI_ENDPOINT": "<AZURE_OPENAI_ENDPOINT>",
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "<AZURE_OPENAI_CHAT_DEPLOYMENT_NAME>"
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "<AZURE_OPENAI_CHAT_DEPLOYMENT_NAME>",
"AZURE_OPENAI_API_KEY": "<AZURE_OPENAI_API_KEY>"
}
}
@@ -1,2 +1,3 @@
agent-framework-azurefunctions
azure-identity
packaging
@@ -4,11 +4,12 @@ This sample evaluates incoming emails with a spam detector agent and,
when appropriate, drafts a response using an email assistant agent.
## Prerequisites
- Python 3.11 environment with dependencies from `requirements.txt` installed.
- Azure Functions Core Tools (`func`) available on the PATH.
- Environment variables `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, and either
`AZURE_OPENAI_API_KEY` or an active Azure CLI login.
- Copy `local.settings.json.template` to `local.settings.json` and populate those Azure OpenAI settings (and storage values) before starting the host.
- Python 3.10+ environment with dependencies from `requirements.txt` installed.
- [Azure Functions Core Tools 4.x](https://learn.microsoft.com/azure/azure-functions/functions-run-local?tabs=windows%2Cpython%2Cv2&pivots=programming-language-python#install-the-azure-functions-core-tools) available on the PATH.
- [Azurite storage emulator](https://learn.microsoft.com/azure/storage/common/storage-use-azurite?tabs=visual-studio) running locally so the sample can use `AzureWebJobsStorage=UseDevelopmentStorage=true`.
- Environment variables `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, and `AZURE_OPENAI_API_KEY`.
- Keep `TASKHUB_NAME` set to `default` unless you intend to change the durable task hub name.
- Copy `local.settings.json.template` to `local.settings.json` and populate those Azure OpenAI settings—including `AZURE_OPENAI_API_KEY`—along with storage values before starting the host.
## Scenario Overview
- Two Azure OpenAI agents share a single deployment: one flags spam, the other drafts replies.
@@ -1,23 +1,12 @@
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"maxTelemetryItemsPerSecond": 20
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle.Preview",
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
},
"extensions": {
"durableTask": {
"storageProvider": {
"type": "azureManaged",
"connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"
}
"hubName": "%TASKHUB_NAME%"
}
}
}
@@ -4,7 +4,9 @@
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None",
"TASKHUB_NAME": "default",
"AZURE_OPENAI_ENDPOINT": "<AZURE_OPENAI_ENDPOINT>",
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "<AZURE_OPENAI_CHAT_DEPLOYMENT_NAME>"
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "<AZURE_OPENAI_CHAT_DEPLOYMENT_NAME>",
"AZURE_OPENAI_API_KEY": "<AZURE_OPENAI_API_KEY>"
}
}
@@ -1,2 +1,3 @@
agent-framework-azurefunctions
azure-identity
packaging
@@ -5,11 +5,12 @@ A single writer agent iterates on content until a human reviewer approves the
output or a maximum number of attempts is reached.
## Prerequisites
- Python 3.11 environment with the packages from `requirements.txt` installed.
- Azure Functions Core Tools (`func`) available on the PATH.
- Environment variables `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, and either
`AZURE_OPENAI_API_KEY` or an active Azure CLI session.
- Copy `local.settings.json.template` to `local.settings.json` and configure those keys (plus storage settings) before starting the Functions host.
- Python 3.10+ environment with the packages from `requirements.txt` installed.
- [Azure Functions Core Tools 4.x](https://learn.microsoft.com/azure/azure-functions/functions-run-local?tabs=windows%2Cpython%2Cv2&pivots=programming-language-python#install-the-azure-functions-core-tools) available on the PATH.
- [Azurite storage emulator](https://learn.microsoft.com/azure/storage/common/storage-use-azurite?tabs=visual-studio) running locally so the sample can use `AzureWebJobsStorage=UseDevelopmentStorage=true`.
- Environment variables `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, and `AZURE_OPENAI_API_KEY`.
- Keep `TASKHUB_NAME` set to `default` unless you intend to change the durable task hub name.
- Copy `local.settings.json.template` to `local.settings.json` and configure those keys—including `AZURE_OPENAI_API_KEY`—plus storage settings before starting the Functions host.
## What It Shows
- Identical environment variable usage (`AZURE_OPENAI_ENDPOINT`,
@@ -63,7 +63,7 @@ app = AgentFunctionApp(agents=[_create_writer_agent()], enable_health_check=True
# 3. Activities encapsulate external work for review notifications and publishing.
@app.activity_trigger(input_name="content")
def notify_user_for_approval(content: dict[str, Any]) -> None:
def notify_user_for_approval(content: dict) -> None:
model = GeneratedContent.model_validate(content)
logger.info("NOTIFICATION: Please review the following content for approval:")
logger.info("Title: %s", model.title or "(untitled)")
@@ -72,7 +72,7 @@ def notify_user_for_approval(content: dict[str, Any]) -> None:
@app.activity_trigger(input_name="content")
def publish_content(content: dict[str, Any]) -> None:
def publish_content(content: dict) -> None:
model = GeneratedContent.model_validate(content)
logger.info("PUBLISHING: Content has been published successfully:")
logger.info("Title: %s", model.title or "(untitled)")
@@ -1,23 +1,12 @@
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"maxTelemetryItemsPerSecond": 20
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle.Preview",
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
},
"extensions": {
"durableTask": {
"storageProvider": {
"type": "azureManaged",
"connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"
}
"hubName": "%TASKHUB_NAME%"
}
}
}
@@ -4,7 +4,9 @@
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None",
"TASKHUB_NAME": "default",
"AZURE_OPENAI_ENDPOINT": "<AZURE_OPENAI_ENDPOINT>",
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "<AZURE_OPENAI_CHAT_DEPLOYMENT_NAME>"
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "<AZURE_OPENAI_CHAT_DEPLOYMENT_NAME>",
"AZURE_OPENAI_API_KEY": "<AZURE_OPENAI_API_KEY>"
}
}
@@ -1,2 +1,3 @@
agent-framework-azurefunctions
azure-identity
packaging