Python: Durable Support for Workflows (#3630)

* Add workflow support for Azure Functions

* fix compatability with latest framework changes and add integration tests

* refactor code

* remove white space

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* align help text with actual port used

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* replace instance id with a place holder

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* remove unused import

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* remove redundant typing import and fix SIM115

* fix latest breaking changes

* fix mypy issues

* clean up imports

* define source marker strings as constants

* fix json module name

* refactor _extract_message_content_from_dict

* refactor serialization

* add helper method for error response construction and remove _extract_message_content_from_dict since it is not needed

* use strict tpe checking for edges

* change how duplicate agent registrations are handled

* cancel approval_task on HITL timeout

* update docstring

* fix: align azurefunctions package with core API changes after rebase

- State.import_state/export_state are now sync (removed await)
- Add State.commit() before export_state() in activity execution
- Rename executor parameter shared_state -> state
- Rename ctx.set_shared_state/get_shared_state -> set_state/get_state (sync)
- WorkflowBuilder now takes start_executor as constructor kwarg
- Update WorkflowOutputEvent -> WorkflowEvent with type='output'
- Update RequestInfoEvent -> WorkflowEvent[Any]
- Update SharedState -> State in test imports
- Update duplicate agent name tests to match new warning behavior
- Update sample README API references

* fix sample check errors

* fix mypy issues

* fix trailing white spaces

* fix test imports

* feat: add durable workflow samples and adapt to main branch changes

- Add workflow samples 09-12 to 04-hosting/azure_functions/
- Adapt to ChatMessage -> Message rename from main
- Adapt to pickle-based checkpoint encoding from main
- Simplify _serialization.py to delegate to core encode/decode
- Fix Message -> WorkflowMessage disambiguation in _context.py
- Remove non-existent _checkpoint_summary import

* fix: update create_checkpoint signature to match superclass

* fix: correct relative link in HITL sample README

* fix: resolve import breakage after rebase (State, DurableAgentThread, get_logger)

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Dmytro Struk <13853051+dmytrostruk@users.noreply.github.com>
This commit is contained in:
Ahmed Muhsin
2026-02-17 16:11:33 -06:00
committed by GitHub
Unverified
parent 9a369c69c0
commit bb3d3c2efc
46 changed files with 5501 additions and 15 deletions
@@ -64,7 +64,7 @@ app = AgentFunctionApp(agents=[_create_writer_agent()], enable_health_check=True
# 3. Activities encapsulate external work for review notifications and publishing.
@app.activity_trigger(input_name="content")
def notify_user_for_approval(content: dict[str, str]) -> None:
def notify_user_for_approval(content: dict) -> None:
model = GeneratedContent.model_validate(content)
logger.info("NOTIFICATION: Please review the following content for approval:")
logger.info("Title: %s", model.title or "(untitled)")
@@ -73,7 +73,7 @@ def notify_user_for_approval(content: dict[str, str]) -> None:
@app.activity_trigger(input_name="content")
def publish_content(content: dict[str, str]) -> None:
def publish_content(content: dict) -> None:
model = GeneratedContent.model_validate(content)
logger.info("PUBLISHING: Content has been published successfully:")
logger.info("Title: %s", model.title or "(untitled)")
@@ -0,0 +1,18 @@
# Local settings
local.settings.json
.env
# Python
__pycache__/
*.py[cod]
.venv/
venv/
# Azure Functions
bin/
obj/
.python_packages/
# IDE
.vscode/
.idea/
@@ -0,0 +1,99 @@
# Workflow with SharedState Sample
This sample demonstrates running **Agent Framework workflows with SharedState** in Azure Durable Functions.
## Overview
This sample shows how to use `AgentFunctionApp` to execute a `WorkflowBuilder` workflow that uses SharedState to pass data between executors. SharedState is a local dictionary maintained by the orchestration that allows executors to share data across workflow steps.
## What This Sample Demonstrates
1. **Workflow Execution** - Running `WorkflowBuilder` workflows in Azure Durable Functions
2. **State APIs** - Using `ctx.set_state()` and `ctx.get_state()` to share data
3. **Conditional Routing** - Routing messages based on spam detection results
4. **Agent + Executor Composition** - Combining AI agents with non-AI function executors
## Workflow Architecture
```
store_email → spam_detector (agent) → to_detection_result → [branch]:
├── If spam: handle_spam → yield "Email marked as spam: {reason}"
└── If not spam: submit_to_email_assistant → email_assistant (agent) → finalize_and_send → yield "Email sent: {response}"
```
### SharedState Usage by Executor
| Executor | SharedState Operations |
|----------|----------------------|
| `store_email` | `set_state("email:{id}", email)`, `set_state("current_email_id", id)` |
| `to_detection_result` | `get_state("current_email_id")` |
| `submit_to_email_assistant` | `get_state("email:{id}")` |
SharedState allows executors to pass large payloads (like email content) by reference rather than through message routing.
## Prerequisites
1. **Azure OpenAI** - Endpoint and deployment configured
2. **Azurite** - For local storage emulation
## Setup
1. Copy `local.settings.json.sample` to `local.settings.json` and configure:
```json
{
"Values": {
"AZURE_OPENAI_ENDPOINT": "https://your-resource.openai.azure.com/",
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "gpt-4o"
}
}
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Start Azurite:
```bash
azurite --silent
```
4. Run the function app:
```bash
func start
```
## Testing
Use the `demo.http` file with REST Client extension or curl:
### Test Spam Email
```bash
curl -X POST http://localhost:7071/api/workflow/run \
-H "Content-Type: application/json" \
-d '"URGENT! You have won $1,000,000! Click here to claim!"'
```
### Test Legitimate Email
```bash
curl -X POST http://localhost:7071/api/workflow/run \
-H "Content-Type: application/json" \
-d '"Hi team, reminder about our meeting tomorrow at 10 AM."'
```
## Expected Output
**Spam email:**
```
Email marked as spam: This email exhibits spam characteristics including urgent language, unrealistic claims of monetary winnings, and requests to click suspicious links.
```
**Legitimate email:**
```
Email sent: Hi, Thank you for the reminder about the sprint planning meeting tomorrow at 10 AM. I will review the agenda and come prepared with my updates. See you then!
```
## Related Samples
- `10_workflow_no_shared_state` - Workflow execution without SharedState usage
- `06_multi_agent_orchestration_conditionals` - Manual Durable Functions orchestration with agents
@@ -0,0 +1,31 @@
@endpoint = http://localhost:7071
### Start the workflow with a spam email
POST {{endpoint}}/api/workflow/run
Content-Type: application/json
"URGENT! You have won $1,000,000! Click here to claim your prize now before it expires!"
### Start the workflow with a legitimate email
POST {{endpoint}}/api/workflow/run
Content-Type: application/json
"Hi team, just a reminder about the sprint planning meeting tomorrow at 10 AM. Please review the agenda items in Jira before the call."
### Start the workflow with another legitimate email
POST {{endpoint}}/api/workflow/run
Content-Type: application/json
"Hello, I wanted to follow up on our conversation from last week regarding the project timeline. Could we schedule a brief call this afternoon to discuss the next steps?"
### Start the workflow with a phishing attempt
POST {{endpoint}}/api/workflow/run
Content-Type: application/json
"Dear Customer, Your account has been compromised! Click this link immediately to secure your account: http://totallylegit.suspicious.com/secure"
### Check workflow status (replace {instanceId} with actual instance ID from response)
GET {{endpoint}}/runtime/webhooks/durabletask/instances/{instanceId}
### Purge all orchestration instances (use for cleanup)
POST {{endpoint}}/runtime/webhooks/durabletask/instances/purge?createdTimeFrom=2020-01-01T00:00:00Z&createdTimeTo=2030-12-31T23:59:59Z
@@ -0,0 +1,292 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Sample: Shared state with agents and conditional routing.
Store an email once by id, classify it with a detector agent, then either draft a reply with an assistant
agent or finish with a spam notice. Stream events as the workflow runs.
Purpose:
Show how to:
- Use shared state to decouple large payloads from messages and pass around lightweight references.
- Enforce structured agent outputs with Pydantic models via response_format for robust parsing.
- Route using conditional edges based on a typed intermediate DetectionResult.
- Compose agent backed executors with function style executors and yield the final output when the workflow completes.
Prerequisites:
- Azure OpenAI configured for AzureOpenAIChatClient with required environment variables.
- Authentication via azure-identity. Use DefaultAzureCredential and run az login before executing the sample.
- Familiarity with WorkflowBuilder, executors, conditional edges, and streaming runs.
"""
import logging
import os
from dataclasses import dataclass
from typing import Any
from uuid import uuid4
from agent_framework import (
AgentExecutorRequest,
AgentExecutorResponse,
Message,
Workflow,
WorkflowBuilder,
WorkflowContext,
executor,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework_azurefunctions import AgentFunctionApp
from azure.identity import AzureCliCredential
from pydantic import BaseModel, ValidationError
from typing_extensions import Never
logger = logging.getLogger(__name__)
# Environment variable names
AZURE_OPENAI_ENDPOINT_ENV = "AZURE_OPENAI_ENDPOINT"
AZURE_OPENAI_DEPLOYMENT_ENV = "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"
AZURE_OPENAI_API_KEY_ENV = "AZURE_OPENAI_API_KEY"
EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"
class DetectionResultAgent(BaseModel):
"""Structured output returned by the spam detection agent."""
is_spam: bool
reason: str
class EmailResponse(BaseModel):
"""Structured output returned by the email assistant agent."""
response: str
@dataclass
class DetectionResult:
"""Internal detection result enriched with the shared state email_id for later lookups."""
is_spam: bool
reason: str
email_id: str
@dataclass
class Email:
"""In memory record stored in shared state to avoid re-sending large bodies on edges."""
email_id: str
email_content: str
def get_condition(expected_result: bool):
"""Create a condition predicate for DetectionResult.is_spam.
Contract:
- If the message is not a DetectionResult, allow it to pass to avoid accidental dead ends.
- Otherwise, return True only when is_spam matches expected_result.
"""
def condition(message: Any) -> bool:
if not isinstance(message, DetectionResult):
return True
return message.is_spam == expected_result
return condition
@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Persist the raw email content in shared state and trigger spam detection.
Responsibilities:
- Generate a unique email_id (UUID) for downstream retrieval.
- Store the Email object under a namespaced key and set the current id pointer.
- Emit an AgentExecutorRequest asking the detector to respond.
"""
new_email = Email(email_id=str(uuid4()), email_content=email_text)
ctx.set_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
ctx.set_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)
await ctx.send_message(
AgentExecutorRequest(messages=[Message(role="user", text=new_email.email_content)], should_respond=True)
)
@executor(id="to_detection_result")
async def to_detection_result(response: AgentExecutorResponse, ctx: WorkflowContext[DetectionResult]) -> None:
"""Parse spam detection JSON into a structured model and enrich with email_id.
Steps:
1) Validate the agent's JSON output into DetectionResultAgent.
2) Retrieve the current email_id from shared state.
3) Send a typed DetectionResult for conditional routing.
"""
try:
parsed = DetectionResultAgent.model_validate_json(response.agent_response.text)
except ValidationError:
# Fallback for empty or invalid response (e.g. due to content filtering)
parsed = DetectionResultAgent(is_spam=True, reason="Agent execution failed or yielded invalid JSON.")
email_id: str = ctx.get_state(CURRENT_EMAIL_ID_KEY)
await ctx.send_message(DetectionResult(is_spam=parsed.is_spam, reason=parsed.reason, email_id=email_id))
@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(detection: DetectionResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Forward non spam email content to the drafting agent.
Guard:
- This path should only receive non spam. Raise if misrouted.
"""
if detection.is_spam:
raise RuntimeError("This executor should only handle non-spam messages.")
# Load the original content by id from shared state and forward it to the assistant.
email: Email = ctx.get_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[Message(role="user", text=email.email_content)], should_respond=True)
)
@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Validate the drafted reply and yield the final output."""
parsed = EmailResponse.model_validate_json(response.agent_response.text)
await ctx.yield_output(f"Email sent: {parsed.response}")
@executor(id="handle_spam")
async def handle_spam(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
"""Yield output describing why the email was marked as spam."""
if detection.is_spam:
await ctx.yield_output(f"Email marked as spam: {detection.reason}")
else:
raise RuntimeError("This executor should only handle spam messages.")
# ============================================================================
# Workflow Creation
# ============================================================================
def _build_client_kwargs() -> dict[str, Any]:
"""Build Azure OpenAI client configuration from environment variables."""
endpoint = os.getenv(AZURE_OPENAI_ENDPOINT_ENV)
if not endpoint:
raise RuntimeError(f"{AZURE_OPENAI_ENDPOINT_ENV} environment variable is required.")
deployment = os.getenv(AZURE_OPENAI_DEPLOYMENT_ENV)
if not deployment:
raise RuntimeError(f"{AZURE_OPENAI_DEPLOYMENT_ENV} environment variable is required.")
client_kwargs: dict[str, Any] = {
"endpoint": endpoint,
"deployment_name": deployment,
}
api_key = os.getenv(AZURE_OPENAI_API_KEY_ENV)
if api_key:
client_kwargs["api_key"] = api_key
else:
client_kwargs["credential"] = AzureCliCredential()
return client_kwargs
def _create_workflow() -> Workflow:
"""Create the email classification workflow with conditional routing."""
client_kwargs = _build_client_kwargs()
chat_client = AzureOpenAIChatClient(**client_kwargs)
spam_detection_agent = chat_client.as_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields is_spam (bool) and reason (string)."
),
default_options={"response_format": DetectionResultAgent},
name="spam_detection_agent",
)
email_assistant_agent = chat_client.as_agent(
instructions=(
"You are an email assistant that helps users draft responses to emails with professionalism. "
"Return JSON with a single field 'response' containing the drafted reply."
),
default_options={"response_format": EmailResponse},
name="email_assistant_agent",
)
# Build the workflow graph with conditional edges.
# Flow:
# store_email -> spam_detection_agent -> to_detection_result -> branch:
# False -> submit_to_email_assistant -> email_assistant_agent -> finalize_and_send
# True -> handle_spam
return (
WorkflowBuilder(start_executor=store_email)
.add_edge(store_email, spam_detection_agent)
.add_edge(spam_detection_agent, to_detection_result)
.add_edge(to_detection_result, submit_to_email_assistant, condition=get_condition(False))
.add_edge(to_detection_result, handle_spam, condition=get_condition(True))
.add_edge(submit_to_email_assistant, email_assistant_agent)
.add_edge(email_assistant_agent, finalize_and_send)
.build()
)
# ============================================================================
# Application Entry Point
# ============================================================================
def launch(durable: bool = True) -> AgentFunctionApp | None:
"""Launch the function app or DevUI.
Args:
durable: If True, returns AgentFunctionApp for Azure Functions.
If False, launches DevUI for local MAF development.
"""
if durable:
# Azure Functions mode with Durable Functions
# SharedState is enabled by default, which this sample requires for storing emails
workflow = _create_workflow()
return AgentFunctionApp(workflow=workflow, enable_health_check=True)
# Pure MAF mode with DevUI for local development
from pathlib import Path
from agent_framework.devui import serve
from dotenv import load_dotenv
env_path = Path(__file__).parent / ".env"
load_dotenv(dotenv_path=env_path)
logger.info("Starting Workflow Shared State Sample in MAF mode")
logger.info("Available at: http://localhost:8096")
logger.info("\nThis workflow demonstrates:")
logger.info("- Shared state to decouple large payloads from messages")
logger.info("- Structured agent outputs with Pydantic models")
logger.info("- Conditional routing based on detection results")
logger.info("\nFlow: store_email -> spam_detection -> branch (spam/not spam)")
workflow = _create_workflow()
serve(entities=[workflow], port=8096, auto_open=True)
return None
# Default: Azure Functions mode
# Run with `python function_app.py --maf` for pure MAF mode with DevUI
app = launch(durable=True)
if __name__ == "__main__":
import sys
if "--maf" in sys.argv:
# Run in pure MAF mode with DevUI
launch(durable=False)
else:
print("Usage: python function_app.py --maf")
print(" --maf Run in pure MAF mode with DevUI (http://localhost:8096)")
print("\nFor Azure Functions mode, use: func start")
@@ -0,0 +1,12 @@
{
"version": "2.0",
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
},
"extensions": {
"durableTask": {
"hubName": "%TASKHUB_NAME%"
}
}
}
@@ -0,0 +1,11 @@
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None",
"TASKHUB_NAME": "default",
"FUNCTIONS_WORKER_RUNTIME": "python",
"AZURE_OPENAI_ENDPOINT": "<Your Azure OpenAI endpoint>",
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "<Your Azure OpenAI chat deployment name>"
}
}
@@ -0,0 +1,3 @@
agent-framework-azurefunctions
azure-identity
agents-maf
@@ -0,0 +1,4 @@
# Azure OpenAI Configuration
AZURE_OPENAI_ENDPOINT=https://<your-resource-name>.openai.azure.com/
AZURE_OPENAI_CHAT_DEPLOYMENT_NAME=<your-deployment-name>
AZURE_OPENAI_API_KEY=<your-api-key>
@@ -0,0 +1,2 @@
.env
local.settings.json
@@ -0,0 +1,159 @@
# Workflow Execution Sample
This sample demonstrates running **Agent Framework workflows** in Azure Durable Functions without using SharedState.
## Overview
This sample shows how to use `AgentFunctionApp` with a `WorkflowBuilder` workflow. The workflow is passed directly to `AgentFunctionApp`, which orchestrates execution using Durable Functions:
```python
workflow = _create_workflow() # Build the workflow graph
app = AgentFunctionApp(workflow=workflow)
```
This approach provides durable, fault-tolerant workflow execution with minimal code.
## What This Sample Demonstrates
1. **Workflow Registration** - Pass a `Workflow` directly to `AgentFunctionApp`
2. **Durable Execution** - Workflow executes with Durable Functions durability and scalability
3. **Conditional Routing** - Route messages based on spam detection (is_spam → spam handler, not spam → email assistant)
4. **Agent + Executor Composition** - Combine AI agents with non-AI executor classes
## Workflow Architecture
```
SpamDetectionAgent → [branch based on is_spam]:
├── If spam: SpamHandlerExecutor → yield "Email marked as spam: {reason}"
└── If not spam: EmailAssistantAgent → EmailSenderExecutor → yield "Email sent: {response}"
```
### Components
| Component | Type | Description |
|-----------|------|-------------|
| `SpamDetectionAgent` | AI Agent | Analyzes emails for spam indicators |
| `EmailAssistantAgent` | AI Agent | Drafts professional email responses |
| `SpamHandlerExecutor` | Executor | Handles spam emails (non-AI) |
| `EmailSenderExecutor` | Executor | Sends email responses (non-AI) |
## Prerequisites
1. **Azure OpenAI** - Endpoint and deployment configured
2. **Azurite** - For local storage emulation
## Setup
1. Copy configuration files:
```bash
cp local.settings.json.sample local.settings.json
```
2. Configure `local.settings.json`:
3. Install dependencies:
```bash
pip install -r requirements.txt
```
4. Start Azurite:
```bash
azurite --silent
```
5. Run the function app:
```bash
func start
```
## Testing
Use the `demo.http` file with REST Client extension or curl:
### Test Spam Email
```bash
curl -X POST http://localhost:7071/api/workflow/run \
-H "Content-Type: application/json" \
-d '{"email_id": "test-001", "email_content": "URGENT! You have won $1,000,000! Click here!"}'
```
### Test Legitimate Email
```bash
curl -X POST http://localhost:7071/api/workflow/run \
-H "Content-Type: application/json" \
-d '{"email_id": "test-002", "email_content": "Hi team, reminder about our meeting tomorrow at 10 AM."}'
```
### Check Status
```bash
curl http://localhost:7071/api/workflow/status/{instanceId}
```
## Expected Output
**Spam email:**
```
Email marked as spam: This email exhibits spam characteristics including urgent language, unrealistic claims of monetary winnings, and requests to click suspicious links.
```
**Legitimate email:**
```
Email sent: Hi, Thank you for the reminder about the sprint planning meeting tomorrow at 10 AM. I will be there.
```
## Code Highlights
### Creating the Workflow
```python
workflow = (
WorkflowBuilder()
.set_start_executor(spam_agent)
.add_switch_case_edge_group(
spam_agent,
[
Case(condition=is_spam_detected, target=spam_handler),
Default(target=email_agent),
],
)
.add_edge(email_agent, email_sender)
.build()
)
```
### Registering with AgentFunctionApp
```python
app = AgentFunctionApp(workflow=workflow, enable_health_check=True)
```
### Executor Classes
```python
class SpamHandlerExecutor(Executor):
@handler
async def handle_spam_result(
self,
agent_response: AgentExecutorResponse,
ctx: WorkflowContext[Never, str],
) -> None:
spam_result = SpamDetectionResult.model_validate_json(agent_response.agent_run_response.text)
await ctx.yield_output(f"Email marked as spam: {spam_result.reason}")
```
## Standalone Mode (DevUI)
This sample also supports running standalone for local development:
```python
# Change launch(durable=True) to launch(durable=False) in function_app.py
# Then run:
python function_app.py
```
This starts the DevUI at `http://localhost:8094` for interactive testing.
## Related Samples
- `09_workflow_shared_state` - Workflow with SharedState for passing data between executors
- `06_multi_agent_orchestration_conditionals` - Manual Durable Functions orchestration with agents
@@ -0,0 +1,32 @@
### Start Workflow Orchestration - Spam Email
POST http://localhost:7071/api/workflow/run
Content-Type: application/json
{
"email_id": "email-001",
"email_content": "URGENT! You've won $1,000,000! Click here immediately to claim your prize! Limited time offer - act now!"
}
###
### Start Workflow Orchestration - Legitimate Email
POST http://localhost:7071/api/workflow/run
Content-Type: application/json
{
"email_id": "email-002",
"email_content": "Hi team, just a reminder about our sprint planning meeting tomorrow at 10 AM. Please review the agenda in Jira."
}
###
### Get Workflow Status
# Replace {instanceId} with the actual instance ID from the start response
GET http://localhost:7071/api/workflow/status/{instanceId}
###
### Health Check
GET http://localhost:7071/api/health
###
@@ -0,0 +1,244 @@
# Copyright (c) Microsoft. All rights reserved.
"""Workflow Execution within Durable Functions Orchestrator.
This sample demonstrates running agent framework WorkflowBuilder workflows inside
a Durable Functions orchestrator by manually traversing the workflow graph and
delegating execution to Durable Entities (for agents) and Activities (for other logic).
Key architectural points:
- AgentFunctionApp registers agents as DurableAIAgents.
- WorkflowBuilder uses `DurableAgentDefinition` (a placeholder) to define the graph.
- The orchestrator (`workflow_orchestration`) iterates through the workflow graph.
- When an agent node is encountered, it calls the corresponding `DurableAIAgent` entity.
- When a standard executor node is encountered, it calls an Activity (`ExecuteExecutor`).
This approach allows using the rich structure of `WorkflowBuilder` while leveraging
the statefulness and durability of `DurableAIAgent`s.
"""
import logging
import os
from pathlib import Path
from typing import Any
from agent_framework import (
AgentExecutorResponse,
Case,
Default,
Executor,
Workflow,
WorkflowBuilder,
WorkflowContext,
handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework_azurefunctions import AgentFunctionApp
from azure.identity import AzureCliCredential
from pydantic import BaseModel, ValidationError
from typing_extensions import Never
logger = logging.getLogger(__name__)
AZURE_OPENAI_ENDPOINT_ENV = "AZURE_OPENAI_ENDPOINT"
AZURE_OPENAI_DEPLOYMENT_ENV = "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"
AZURE_OPENAI_API_KEY_ENV = "AZURE_OPENAI_API_KEY"
SPAM_AGENT_NAME = "SpamDetectionAgent"
EMAIL_AGENT_NAME = "EmailAssistantAgent"
SPAM_DETECTION_INSTRUCTIONS = (
"You are a spam detection assistant that identifies spam emails.\n\n"
"Analyze the email content for spam indicators including:\n"
"1. Suspicious language (urgent, limited time, act now, free money, etc.)\n"
"2. Suspicious links or requests for personal information\n"
"3. Poor grammar or spelling\n"
"4. Requests for money or financial information\n"
"5. Impersonation attempts\n\n"
"Return a JSON response with:\n"
"- is_spam: boolean indicating if it's spam\n"
"- confidence: float between 0.0 and 1.0\n"
"- reason: detailed explanation of your classification"
)
EMAIL_ASSISTANT_INSTRUCTIONS = (
"You are an email assistant that helps users draft responses to legitimate emails.\n\n"
"When you receive an email that has been verified as legitimate:\n"
"1. Draft a professional and appropriate response\n"
"2. Match the tone and formality of the original email\n"
"3. Be helpful and courteous\n"
"4. Keep the response concise but complete\n\n"
"Return a JSON response with:\n"
"- response: the drafted email response"
)
class SpamDetectionResult(BaseModel):
is_spam: bool
confidence: float
reason: str
class EmailResponse(BaseModel):
response: str
class EmailPayload(BaseModel):
email_id: str
email_content: str
def _build_client_kwargs() -> dict[str, Any]:
endpoint = os.getenv(AZURE_OPENAI_ENDPOINT_ENV)
if not endpoint:
raise RuntimeError(f"{AZURE_OPENAI_ENDPOINT_ENV} environment variable is required.")
deployment = os.getenv(AZURE_OPENAI_DEPLOYMENT_ENV)
if not deployment:
raise RuntimeError(f"{AZURE_OPENAI_DEPLOYMENT_ENV} environment variable is required.")
client_kwargs: dict[str, Any] = {
"endpoint": endpoint,
"deployment_name": deployment,
}
api_key = os.getenv(AZURE_OPENAI_API_KEY_ENV)
if api_key:
client_kwargs["api_key"] = api_key
else:
client_kwargs["credential"] = AzureCliCredential()
return client_kwargs
# Executors for non-AI activities (defined at module level)
class SpamHandlerExecutor(Executor):
"""Executor that handles spam emails (non-AI activity)."""
@handler
async def handle_spam_result(
self,
agent_response: AgentExecutorResponse,
ctx: WorkflowContext[Never, str],
) -> None:
"""Mark email as spam and log the reason."""
text = agent_response.agent_response.text
try:
spam_result = SpamDetectionResult.model_validate_json(text)
except ValidationError:
spam_result = SpamDetectionResult(is_spam=True, reason="Invalid JSON from agent")
message = f"Email marked as spam: {spam_result.reason}"
await ctx.yield_output(message)
class EmailSenderExecutor(Executor):
"""Executor that sends email responses (non-AI activity)."""
@handler
async def handle_email_response(
self,
agent_response: AgentExecutorResponse,
ctx: WorkflowContext[Never, str],
) -> None:
"""Send the drafted email response."""
text = agent_response.agent_response.text
try:
email_response = EmailResponse.model_validate_json(text)
except ValidationError:
email_response = EmailResponse(response="Error generating response.")
message = f"Email sent: {email_response.response}"
await ctx.yield_output(message)
# Condition function for routing
def is_spam_detected(message: Any) -> bool:
"""Check if spam was detected in the email."""
if not isinstance(message, AgentExecutorResponse):
return False
try:
result = SpamDetectionResult.model_validate_json(message.agent_response.text)
return result.is_spam
except Exception:
return False
def _create_workflow() -> Workflow:
"""Create the workflow definition."""
client_kwargs = _build_client_kwargs()
chat_client = AzureOpenAIChatClient(**client_kwargs)
spam_agent = chat_client.as_agent(
name=SPAM_AGENT_NAME,
instructions=SPAM_DETECTION_INSTRUCTIONS,
default_options={"response_format": SpamDetectionResult},
)
email_agent = chat_client.as_agent(
name=EMAIL_AGENT_NAME,
instructions=EMAIL_ASSISTANT_INSTRUCTIONS,
default_options={"response_format": EmailResponse},
)
# Executors
spam_handler = SpamHandlerExecutor(id="spam_handler")
email_sender = EmailSenderExecutor(id="email_sender")
# Build workflow
return (
WorkflowBuilder(start_executor=spam_agent)
.add_switch_case_edge_group(
spam_agent,
[
Case(condition=is_spam_detected, target=spam_handler),
Default(target=email_agent),
],
)
.add_edge(email_agent, email_sender)
.build()
)
def launch(durable: bool = True) -> AgentFunctionApp | None:
workflow: Workflow | None = None
if durable:
# Initialize app
workflow = _create_workflow()
return AgentFunctionApp(workflow=workflow)
# Launch the spam detection workflow in DevUI
from agent_framework.devui import serve
from dotenv import load_dotenv
# Load environment variables from .env file
env_path = Path(__file__).parent / ".env"
load_dotenv(dotenv_path=env_path)
logger.info("Starting Multi-Agent Spam Detection Workflow")
logger.info("Available at: http://localhost:8094")
logger.info("\nThis workflow demonstrates:")
logger.info("- Conditional routing based on spam detection")
logger.info("- Mixing AI agents with non-AI executors (like activity functions)")
logger.info("- Path 1 (spam): SpamDetector Agent → SpamHandler Executor")
logger.info("- Path 2 (legitimate): SpamDetector Agent → EmailAssistant Agent → EmailSender Executor")
workflow = _create_workflow()
serve(entities=[workflow], port=8094, auto_open=True)
return None
# Default: Azure Functions mode
# Run with `python function_app.py --maf` for pure MAF mode with DevUI
app = launch(durable=True)
if __name__ == "__main__":
import sys
if "--maf" in sys.argv:
# Run in pure MAF mode with DevUI
launch(durable=False)
else:
print("Usage: python function_app.py --maf")
print(" --maf Run in pure MAF mode with DevUI (http://localhost:8094)")
print("\nFor Azure Functions mode, use: func start")
@@ -0,0 +1,12 @@
{
"version": "2.0",
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
},
"extensions": {
"durableTask": {
"hubName": "%TASKHUB_NAME%"
}
}
}
@@ -0,0 +1,12 @@
{
"IsEncrypted": false,
"Values": {
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None",
"TASKHUB_NAME": "default",
"AZURE_OPENAI_ENDPOINT": "https://<your-resource-name>.openai.azure.com/",
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "<your-deployment-name>",
"AZURE_OPENAI_API_KEY": "<your-api-key>"
}
}
@@ -0,0 +1,3 @@
agent-framework-azurefunctions
agent-framework
azure-identity
@@ -0,0 +1,14 @@
# Azure Functions Runtime Configuration
FUNCTIONS_WORKER_RUNTIME=python
AzureWebJobsStorage=UseDevelopmentStorage=true
# Durable Task Scheduler Configuration
# For local development with DTS emulator: Endpoint=http://localhost:8080;TaskHub=default;Authentication=None
# For Azure: Get connection string from Azure portal
DURABLE_TASK_SCHEDULER_CONNECTION_STRING=Endpoint=http://localhost:8080;TaskHub=default;Authentication=None
TASKHUB_NAME=default
# Azure OpenAI Configuration
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
AZURE_OPENAI_CHAT_DEPLOYMENT_NAME=your-deployment-name
AZURE_OPENAI_API_KEY=your-api-key
@@ -0,0 +1,4 @@
.venv/
__pycache__/
local.settings.json
.env
@@ -0,0 +1,193 @@
# Parallel Workflow Execution Sample
This sample demonstrates **parallel execution** of executors and agents in Azure Durable Functions workflows.
## Overview
This sample showcases three different parallel execution patterns:
1. **Two Executors in Parallel** - Fan-out to multiple activities
2. **Two Agents in Parallel** - Fan-out to multiple entities
3. **Mixed Execution** - Agents and executors can run concurrently
## Workflow Architecture
```
┌─────────────────────────────────────────────────────────────────────────┐
│ PARALLEL WORKFLOW │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Pattern 1: Two Executors in Parallel (Activities) │
│ ───────────────────────────────────────────────── │
│ │
│ input_router ──┬──> [word_count_processor] ────┐ │
│ │ │ │
│ └──> [format_analyzer_processor]┴──> [aggregator] │
│ │
│ Pattern 2: Two Agents in Parallel (Entities) │
│ ───────────────────────────────────────────── │
│ │
│ [prepare_for_agents] ──┬──> [SentimentAgent] ──────┐ │
│ │ │ │
│ └──> [KeywordAgent] ────────┴──> [prepare_for_│
│ mixed] │
│ │
│ Pattern 3: Mixed Agent + Executor in Parallel │
│ ──────────────────────────────────────────────── │
│ │
│ [prepare_for_mixed] ──┬──> [SummaryAgent] ─────────┐ │
│ │ │ │
│ └──> [statistics_processor] ─┴──> [final_report│
│ _executor] │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
## How Parallel Execution Works
### Activities (Executors)
When multiple executors are pending in the same iteration (e.g., after a fan-out edge), they are batched and executed using `task_all()`:
```python
# In _workflow.py - activities execute in parallel
activity_tasks = [context.call_activity("ExecuteExecutor", input) for ...]
results = yield context.task_all(activity_tasks) # All run concurrently!
```
### Agents (Entities)
Different agents can also run in parallel when they're pending in the same iteration:
```python
# Different agents run in parallel
agent_tasks = [agent_a.run(...), agent_b.run(...)]
responses = yield context.task_all(agent_tasks) # Both agents run concurrently!
```
**Note:** Multiple messages to the *same* agent are processed sequentially to maintain conversation coherence.
## Components
| Component | Type | Description |
|-----------|------|-------------|
| `input_router` | Executor | Routes input JSON to parallel processors |
| `word_count_processor` | Executor | Counts words and characters |
| `format_analyzer_processor` | Executor | Analyzes document format |
| `aggregator` | Executor | Combines results from parallel processors |
| `prepare_for_agents` | Executor | Prepares content for agent analysis |
| `SentimentAnalysisAgent` | AI Agent | Analyzes text sentiment |
| `KeywordExtractionAgent` | AI Agent | Extracts keywords and categories |
| `prepare_for_mixed` | Executor | Prepares content for mixed parallel execution |
| `SummaryAgent` | AI Agent | Summarizes the document |
| `statistics_processor` | Executor | Computes document statistics |
| `FinalReportExecutor` | Executor | Compiles final report from all analyses |
## Prerequisites
1. **Azure OpenAI** - Endpoint and deployment configured
2. **DTS Emulator** - For durable task scheduling (recommended)
3. **Azurite** - For Azure Functions internal storage
## Setup
### Option 1: DevUI Mode (Local Development - No Durable Functions)
The sample can run locally without Azure Functions infrastructure using DevUI:
1. Copy the environment template:
```bash
cp .env.template .env
```
2. Configure `.env` with your Azure OpenAI credentials
3. Install dependencies:
```bash
pip install -r requirements.txt
```
4. Run in DevUI mode (set `durable=False` in `function_app.py`):
```bash
python function_app.py
```
5. Open `http://localhost:8095` and provide input:
```json
{
"document_id": "doc-001",
"content": "Your document text here..."
}
```
### Option 2: Durable Functions Mode (Full Azure Functions)
1. Copy configuration files:
```bash
cp .env.template .env
cp local.settings.json.sample local.settings.json
```
2. Configure `local.settings.json` with your Azure OpenAI credentials
3. Install dependencies:
```bash
pip install -r requirements.txt
```
4. Start DTS Emulator:
```bash
docker run -d --name dts-emulator -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
```
5. Start Azurite (or use VS Code extension):
```bash
azurite --silent
```
6. Run the function app (ensure `durable=True` in `function_app.py`):
```bash
func start
```
## Testing
Use the `demo.http` file with REST Client extension or curl:
### Analyze a Document
```bash
curl -X POST http://localhost:7071/api/workflow/run \
-H "Content-Type: application/json" \
-d '{
"document_id": "doc-001",
"content": "The quarterly earnings report shows strong growth in cloud services. Revenue increased by 25%."
}'
```
### Check Status
```bash
curl http://localhost:7071/api/workflow/status/{instanceId}
```
## Observing Parallel Execution
Open the DTS Dashboard at `http://localhost:8082` to observe:
1. **Activity Execution Timeline** - You'll see `word_count_processor` and `format_analyzer_processor` starting at approximately the same time
2. **Agent Execution Timeline** - `SentimentAnalysisAgent` and `KeywordExtractionAgent` also start concurrently
3. **Sequential vs Parallel** - Compare with non-parallel samples to see the time savings
## Expected Output
```json
{
"output": [
"=== Document Analysis Report ===\n\n--- SentimentAnalysisAgent ---\n{\"sentiment\": \"positive\", \"confidence\": 0.85, \"explanation\": \"...\"}\n\n--- KeywordExtractionAgent ---\n{\"keywords\": [\"earnings\", \"growth\", \"cloud\"], \"categories\": [\"finance\", \"technology\"]}"
]
}
```
## Key Takeaways
1. **Parallel execution is automatic** - When multiple executors/agents are pending in the same iteration, they run in parallel
2. **Workflow graph determines parallelism** - Fan-out edges create parallel execution opportunities
3. **Mixed parallelism** - Agents and executors can run concurrently if they're in the same iteration
4. **Same-agent messages are sequential** - To maintain conversation coherence
@@ -0,0 +1,29 @@
### Analyze a document (triggers parallel workflow)
POST http://localhost:7071/api/workflow/run
Content-Type: application/json
{
"document_id": "doc-001",
"content": "The quarterly earnings report shows strong growth in our cloud services division. Revenue increased by 25% compared to last year, driven by enterprise adoption. Customer satisfaction remains high at 92%. However, we face challenges in the mobile segment where competition is intense. Overall, the outlook is positive with expected continued growth in the coming quarters."
}
###
### Short document test
POST http://localhost:7071/api/workflow/run
Content-Type: application/json
{
"document_id": "doc-002",
"content": "Quick update: Project completed successfully. Team performance exceeded expectations."
}
###
### Check workflow status
GET http://localhost:7071/api/workflow/status/{{instanceId}}
###
### Health check
GET http://localhost:7071/api/health
@@ -0,0 +1,524 @@
# Copyright (c) Microsoft. All rights reserved.
"""Parallel Workflow Execution Sample.
This sample demonstrates parallel execution of executors and agents in Azure Durable Functions.
It showcases three different parallel execution patterns:
1. Two executors running concurrently (fan-out to activities)
2. Two agents running concurrently (fan-out to entities)
3. One executor and one agent running concurrently (mixed fan-out)
The workflow simulates a document processing pipeline where:
- A document is analyzed by multiple processors in parallel
- Results are aggregated and then processed by agents
- A summary agent and statistics executor run in parallel
- Finally, combined into a single output
Key architectural points:
- FanOut edges enable parallel execution
- Different agents run in parallel when they're in the same iteration
- Activities (executors) also run in parallel when pending together
- Mixed agent/executor fan-outs execute concurrently
"""
import json
import logging
import os
from dataclasses import dataclass
from typing import Any
from agent_framework import (
AgentExecutorResponse,
Executor,
Workflow,
WorkflowBuilder,
WorkflowContext,
executor,
handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework_azurefunctions import AgentFunctionApp
from azure.identity import AzureCliCredential
from pydantic import BaseModel
from typing_extensions import Never
logger = logging.getLogger(__name__)
AZURE_OPENAI_ENDPOINT_ENV = "AZURE_OPENAI_ENDPOINT"
AZURE_OPENAI_DEPLOYMENT_ENV = "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"
AZURE_OPENAI_API_KEY_ENV = "AZURE_OPENAI_API_KEY"
# Agent names
SENTIMENT_AGENT_NAME = "SentimentAnalysisAgent"
KEYWORD_AGENT_NAME = "KeywordExtractionAgent"
SUMMARY_AGENT_NAME = "SummaryAgent"
RECOMMENDATION_AGENT_NAME = "RecommendationAgent"
# ============================================================================
# Pydantic Models for structured outputs
# ============================================================================
class SentimentResult(BaseModel):
"""Result from sentiment analysis."""
sentiment: str # positive, negative, neutral
confidence: float
explanation: str
class KeywordResult(BaseModel):
"""Result from keyword extraction."""
keywords: list[str]
categories: list[str]
class SummaryResult(BaseModel):
"""Result from summarization."""
summary: str
key_points: list[str]
class RecommendationResult(BaseModel):
"""Result from recommendation engine."""
recommendations: list[str]
priority: str
@dataclass
class DocumentInput:
"""Input document to be processed."""
document_id: str
content: str
@dataclass
class ProcessorResult:
"""Result from a document processor (executor)."""
processor_name: str
document_id: str
content: str
word_count: int
char_count: int
has_numbers: bool
@dataclass
class AggregatedResults:
"""Aggregated results from parallel processors."""
document_id: str
content: str
processor_results: list[ProcessorResult]
@dataclass
class AgentAnalysis:
"""Analysis result from an agent."""
agent_name: str
result: str
@dataclass
class FinalReport:
"""Final combined report."""
document_id: str
analyses: list[AgentAnalysis]
# ============================================================================
# Executor Definitions (Activities - run in parallel when pending together)
# ============================================================================
@executor(id="input_router")
async def input_router(
doc: str,
ctx: WorkflowContext[DocumentInput]
) -> None:
"""Route input document to parallel processors.
Accepts a JSON string from the HTTP request and converts to DocumentInput.
"""
# Parse the JSON string input
data = json.loads(doc) if isinstance(doc, str) else doc
document = DocumentInput(
document_id=data.get("document_id", "unknown"),
content=data.get("content", ""),
)
logger.info("[input_router] Routing document: %s", document.document_id)
await ctx.send_message(document)
@executor(id="word_count_processor")
async def word_count_processor(
doc: DocumentInput,
ctx: WorkflowContext[ProcessorResult]
) -> None:
"""Process document and count words - runs as an activity."""
logger.info("[word_count_processor] Processing document: %s", doc.document_id)
word_count = len(doc.content.split())
char_count = len(doc.content)
has_numbers = any(c.isdigit() for c in doc.content)
result = ProcessorResult(
processor_name="word_count",
document_id=doc.document_id,
content=doc.content,
word_count=word_count,
char_count=char_count,
has_numbers=has_numbers,
)
await ctx.send_message(result)
@executor(id="format_analyzer_processor")
async def format_analyzer_processor(
doc: DocumentInput,
ctx: WorkflowContext[ProcessorResult]
) -> None:
"""Analyze document format - runs as an activity in parallel with word_count."""
logger.info("[format_analyzer_processor] Processing document: %s", doc.document_id)
# Simple format analysis
lines = doc.content.split("\n")
word_count = len(lines) # Using line count as "word count" for this processor
char_count = sum(len(line) for line in lines)
has_numbers = doc.content.count(".") > 0 # Check for sentences
result = ProcessorResult(
processor_name="format_analyzer",
document_id=doc.document_id,
content=doc.content,
word_count=word_count,
char_count=char_count,
has_numbers=has_numbers,
)
await ctx.send_message(result)
@executor(id="aggregator")
async def aggregator(
results: list[ProcessorResult],
ctx: WorkflowContext[AggregatedResults]
) -> None:
"""Aggregate results from parallel processors - receives fan-in input."""
logger.info("[aggregator] Aggregating %d results", len(results))
# Extract document info from the first result (all have the same content)
document_id = results[0].document_id if results else "unknown"
content = results[0].content if results else ""
aggregated = AggregatedResults(
document_id=document_id,
content=content,
processor_results=results,
)
await ctx.send_message(aggregated)
@executor(id="prepare_for_agents")
async def prepare_for_agents(
aggregated: AggregatedResults,
ctx: WorkflowContext[str]
) -> None:
"""Prepare content for agent analysis - broadcasts to multiple agents."""
logger.info("[prepare_for_agents] Preparing content for agents")
# Send the original content to agents for analysis
await ctx.send_message(aggregated.content)
@executor(id="prepare_for_mixed")
async def prepare_for_mixed(
analyses: list[AgentExecutorResponse],
ctx: WorkflowContext[str]
) -> None:
"""Prepare results for mixed agent+executor parallel processing.
Combines agent analysis results into a string that can be consumed by
both the SummaryAgent and the statistics_processor in parallel.
"""
logger.info("[prepare_for_mixed] Preparing for mixed parallel pattern")
sentiment_text = ""
keyword_text = ""
for analysis in analyses:
executor_id = analysis.executor_id
text = analysis.agent_response.text if analysis.agent_response else ""
if executor_id == SENTIMENT_AGENT_NAME:
sentiment_text = text
elif executor_id == KEYWORD_AGENT_NAME:
keyword_text = text
# Combine into a string that both agent and executor can process
combined = f"Sentiment Analysis: {sentiment_text}\n\nKeyword Extraction: {keyword_text}"
await ctx.send_message(combined)
@executor(id="statistics_processor")
async def statistics_processor(
analysis_text: str,
ctx: WorkflowContext[ProcessorResult]
) -> None:
"""Calculate statistics from the analysis - runs in parallel with SummaryAgent."""
logger.info("[statistics_processor] Calculating statistics")
# Calculate some statistics from the combined analysis
word_count = len(analysis_text.split())
char_count = len(analysis_text)
has_numbers = any(c.isdigit() for c in analysis_text)
result = ProcessorResult(
processor_name="statistics",
document_id="analysis",
content=analysis_text,
word_count=word_count,
char_count=char_count,
has_numbers=has_numbers,
)
await ctx.send_message(result)
class FinalReportExecutor(Executor):
"""Executor that compiles the final report from agent analyses."""
@handler
async def compile_report(
self,
analyses: list[AgentExecutorResponse | ProcessorResult],
ctx: WorkflowContext[Never, str],
) -> None:
"""Compile final report from mixed agent + processor results."""
logger.info("[final_report] Compiling report from %d analyses", len(analyses))
report_parts = ["=== Document Analysis Report ===\n"]
for analysis in analyses:
if isinstance(analysis, AgentExecutorResponse):
agent_name = analysis.executor_id
text = analysis.agent_response.text if analysis.agent_response else "No response"
elif isinstance(analysis, ProcessorResult):
agent_name = f"Processor: {analysis.processor_name}"
text = f"Words: {analysis.word_count}, Chars: {analysis.char_count}"
else:
continue
report_parts.append(f"\n--- {agent_name} ---")
report_parts.append(text)
final_report = "\n".join(report_parts)
await ctx.yield_output(final_report)
class MixedResultCollector(Executor):
"""Collector for mixed agent/executor results."""
@handler
async def collect_mixed_results(
self,
results: list[Any],
ctx: WorkflowContext[Never, str],
) -> None:
"""Collect and format results from mixed parallel execution."""
logger.info("[mixed_collector] Collecting %d mixed results", len(results))
output_parts = ["=== Mixed Parallel Execution Results ===\n"]
for result in results:
if isinstance(result, AgentExecutorResponse):
output_parts.append(f"[Agent: {result.executor_id}]")
output_parts.append(result.agent_response.text if result.agent_response else "No response")
elif isinstance(result, ProcessorResult):
output_parts.append(f"[Processor: {result.processor_name}]")
output_parts.append(f" Words: {result.word_count}, Chars: {result.char_count}")
await ctx.yield_output("\n".join(output_parts))
# ============================================================================
# Workflow Construction
# ============================================================================
def _build_client_kwargs() -> dict[str, Any]:
"""Build Azure OpenAI client kwargs from environment variables."""
endpoint = os.getenv(AZURE_OPENAI_ENDPOINT_ENV)
if not endpoint:
raise RuntimeError(f"{AZURE_OPENAI_ENDPOINT_ENV} environment variable is required.")
deployment = os.getenv(AZURE_OPENAI_DEPLOYMENT_ENV)
if not deployment:
raise RuntimeError(f"{AZURE_OPENAI_DEPLOYMENT_ENV} environment variable is required.")
client_kwargs: dict[str, Any] = {
"endpoint": endpoint,
"deployment_name": deployment,
}
api_key = os.getenv(AZURE_OPENAI_API_KEY_ENV)
if api_key:
client_kwargs["api_key"] = api_key
else:
client_kwargs["credential"] = AzureCliCredential()
return client_kwargs
def _create_workflow() -> Workflow:
"""Create the parallel workflow definition.
Workflow structure demonstrating three parallel patterns:
Pattern 1: Two Executors in Parallel (Fan-out/Fan-in to activities)
────────────────────────────────────────────────────────────────────
┌─> word_count_processor ─────┐
input_router ──┤ ├──> aggregator
└─> format_analyzer_processor ─┘
Pattern 2: Two Agents in Parallel (Fan-out to entities)
────────────────────────────────────────────────────────
prepare_for_agents ─┬─> SentimentAgent ──┐
└─> KeywordAgent ────┤
└──> prepare_for_mixed
Pattern 3: Mixed Agent + Executor in Parallel
──────────────────────────────────────────────
prepare_for_mixed ─┬─> SummaryAgent ────────┐
└─> statistics_processor ─┤
└──> final_report
"""
client_kwargs = _build_client_kwargs()
chat_client = AzureOpenAIChatClient(**client_kwargs)
# Create agents for parallel analysis
sentiment_agent = chat_client.as_agent(
name=SENTIMENT_AGENT_NAME,
instructions=(
"You are a sentiment analysis expert. Analyze the sentiment of the given text. "
"Return JSON with fields: sentiment (positive/negative/neutral), "
"confidence (0.0-1.0), and explanation (brief reasoning)."
),
default_options={"response_format": SentimentResult},
)
keyword_agent = chat_client.as_agent(
name=KEYWORD_AGENT_NAME,
instructions=(
"You are a keyword extraction expert. Extract important keywords and categories "
"from the given text. Return JSON with fields: keywords (list of strings), "
"and categories (list of topic categories)."
),
default_options={"response_format": KeywordResult},
)
# Create summary agent for Pattern 3 (mixed parallel)
summary_agent = chat_client.as_agent(
name=SUMMARY_AGENT_NAME,
instructions=(
"You are a summarization expert. Given analysis results (sentiment and keywords), "
"provide a concise summary. Return JSON with fields: summary (brief text), "
"and key_points (list of main takeaways)."
),
default_options={"response_format": SummaryResult},
)
# Create executor instances
final_report_executor = FinalReportExecutor(id="final_report")
# Build workflow with parallel patterns
return (
WorkflowBuilder(start_executor=input_router)
# Pattern 1: Fan-out to two executors (run in parallel)
.add_fan_out_edges(
source=input_router,
targets=[word_count_processor, format_analyzer_processor],
)
# Fan-in: Both processors send results to aggregator
.add_fan_in_edges(
sources=[word_count_processor, format_analyzer_processor],
target=aggregator,
)
# Prepare content for agent analysis
.add_edge(aggregator, prepare_for_agents)
# Pattern 2: Fan-out to two agents (run in parallel)
.add_fan_out_edges(
source=prepare_for_agents,
targets=[sentiment_agent, keyword_agent],
)
# Fan-in: Collect agent results into prepare_for_mixed
.add_fan_in_edges(
sources=[sentiment_agent, keyword_agent],
target=prepare_for_mixed,
)
# Pattern 3: Fan-out to one agent + one executor (mixed parallel)
.add_fan_out_edges(
source=prepare_for_mixed,
targets=[summary_agent, statistics_processor],
)
# Final fan-in: Collect mixed results
.add_fan_in_edges(
sources=[summary_agent, statistics_processor],
target=final_report_executor,
)
.build()
)
# ============================================================================
# Application Entry Point
# ============================================================================
def launch(durable: bool = True) -> AgentFunctionApp | None:
"""Launch the function app or DevUI."""
workflow: Workflow | None = None
if durable:
workflow = _create_workflow()
return AgentFunctionApp(
workflow=workflow,
enable_health_check=True,
)
from pathlib import Path
from agent_framework.devui import serve
from dotenv import load_dotenv
env_path = Path(__file__).parent / ".env"
load_dotenv(dotenv_path=env_path)
logger.info("Starting Parallel Workflow Sample")
logger.info("Available at: http://localhost:8095")
logger.info("\nThis workflow demonstrates:")
logger.info("- Pattern 1: Two executors running in parallel")
logger.info("- Pattern 2: Two agents running in parallel")
logger.info("- Pattern 3: Mixed agent + executor running in parallel")
logger.info("- Fan-in aggregation of parallel results")
workflow = _create_workflow()
serve(entities=[workflow], port=8095, auto_open=True)
return None
# Default: Azure Functions mode
# Run with `python function_app.py --maf` for pure MAF mode with DevUI
app = launch(durable=True)
if __name__ == "__main__":
import sys
if "--maf" in sys.argv:
# Run in pure MAF mode with DevUI
launch(durable=False)
else:
print("Usage: python function_app.py --maf")
print(" --maf Run in pure MAF mode with DevUI (http://localhost:8095)")
print("\nFor Azure Functions mode, use: func start")
@@ -0,0 +1,12 @@
{
"version": "2.0",
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
},
"extensions": {
"durableTask": {
"hubName": "%TASKHUB_NAME%"
}
}
}
@@ -0,0 +1,12 @@
{
"IsEncrypted": false,
"Values": {
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None",
"TASKHUB_NAME": "default",
"AZURE_OPENAI_ENDPOINT": "https://<your-resource-name>.openai.azure.com/",
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "<your-deployment-name>",
"AZURE_OPENAI_API_KEY": "<your-api-key>"
}
}
@@ -0,0 +1,3 @@
agent-framework-azurefunctions
agent-framework
azure-identity
@@ -0,0 +1,5 @@
# Local settings - copy from local.settings.json.sample and fill in your values
local.settings.json
__pycache__/
*.pyc
.venv/
@@ -0,0 +1,141 @@
# 12. Workflow with Human-in-the-Loop (HITL)
This sample demonstrates how to integrate human approval into a MAF workflow running on Azure Durable Functions using the MAF `request_info` and `@response_handler` pattern.
## Overview
The sample implements a content moderation pipeline:
1. **User starts workflow** with content for publication via HTTP endpoint
2. **AI Agent analyzes** the content for policy compliance
3. **Workflow pauses** and requests human reviewer approval
4. **Human responds** via HTTP endpoint with approval/rejection
5. **Workflow resumes** and publishes or rejects the content
## Key Concepts
### MAF HITL Pattern
This sample uses MAF's built-in human-in-the-loop pattern:
```python
# In an executor, request human input
await ctx.request_info(
request_data=HumanApprovalRequest(...),
response_type=HumanApprovalResponse,
)
# Handle the response in a separate method
@response_handler
async def handle_approval_response(
self,
original_request: HumanApprovalRequest,
response: HumanApprovalResponse,
ctx: WorkflowContext,
) -> None:
# Process the human's decision
...
```
### Automatic HITL Endpoints
`AgentFunctionApp` automatically provides all the HTTP endpoints needed for HITL:
| Endpoint | Description |
|----------|-------------|
| `POST /api/workflow/run` | Start the workflow |
| `GET /api/workflow/status/{instanceId}` | Check status and pending HITL requests |
| `POST /api/workflow/respond/{instanceId}/{requestId}` | Send human response |
| `GET /api/health` | Health check |
### Durable Functions Integration
When running on Durable Functions, the HITL pattern maps to:
| MAF Concept | Durable Functions |
|-------------|-------------------|
| `ctx.request_info()` | Workflow pauses, custom status updated |
| `RequestInfoEvent` | Exposed via status endpoint |
| HTTP response | `client.raise_event(instance_id, request_id, data)` |
| `@response_handler` | Workflow resumes, handler invoked |
## Workflow Architecture
```
┌─────────────────┐ ┌──────────────────────┐ ┌────────────────────────┐
│ Input Router │ ──► │ Content Analyzer │ ──► │ Content Analyzer │
│ Executor │ │ Agent (AI) │ │ Executor (Parse JSON) │
└─────────────────┘ └──────────────────────┘ └────────────────────────┘
┌─────────────────┐ ┌──────────────────────┐
│ Publish │ ◄── │ Human Review │ ◄── HITL PAUSE
│ Executor │ │ Executor │ (wait for external event)
└─────────────────┘ └──────────────────────┘
```
## Prerequisites
1. **Azure OpenAI** - Access to Azure OpenAI with a deployed chat model
2. **Durable Task Scheduler** - Local emulator or Azure deployment
3. **Azurite** - Local Azure Storage emulator
4. **Azure CLI** - For authentication (`az login`)
## Setup
1. Copy the sample settings file:
```bash
cp local.settings.json.sample local.settings.json
```
2. Update `local.settings.json` with your Azure OpenAI credentials:
```json
{
"Values": {
"AZURE_OPENAI_ENDPOINT": "https://your-resource.openai.azure.com/",
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "gpt-4o"
}
}
```
3. Start the local emulators:
```bash
# Terminal 1: Start Azurite
azurite --silent --location .
# Terminal 2: Start Durable Task Scheduler (if using local emulator)
# Follow Durable Task Scheduler setup instructions
```
4. Start the Function App:
```bash
func start
```
## Running in Pure MAF Mode
You can also run this sample in pure MAF mode (without Durable Functions) using the DevUI:
```bash
python function_app.py --maf
```
This launches the DevUI at http://localhost:8096 where you can interact with the workflow directly. This is useful for:
- Local development and debugging
- Testing the HITL pattern without Durable Functions infrastructure
- Comparing behavior between MAF and Durable modes
## Testing
Use the `demo.http` file with the VS Code REST Client extension:
1. **Start workflow** - `POST /api/workflow/run` with content payload
2. **Check status** - `GET /api/workflow/status/{instanceId}` to see pending HITL requests
3. **Send response** - `POST /api/workflow/respond/{instanceId}/{requestId}` with approval
4. **Check result** - `GET /api/workflow/status/{instanceId}` to see final output
## Related Samples
- [07_single_agent_orchestration_hitl](../07_single_agent_orchestration_hitl/) - HITL at orchestrator level (not using MAF pattern)
- [09_workflow_shared_state](../09_workflow_shared_state/) - Workflow with shared state
- [guessing_game_with_human_input](../../../03-workflows/human-in-the-loop/guessing_game_with_human_input.py) - MAF HITL pattern (non-durable)
@@ -0,0 +1,123 @@
### ============================================================================
### Workflow HITL Sample - Content Moderation with Human Approval
### ============================================================================
### This sample demonstrates MAF workflows with human-in-the-loop using the
### request_info / @response_handler pattern on Azure Durable Functions.
###
### The AgentFunctionApp automatically provides all HITL endpoints.
###
### Prerequisites:
### 1. Start Azurite: azurite --silent --location .
### 2. Start Durable Task Scheduler emulator
### 3. Configure local.settings.json with Azure OpenAI credentials
### 4. Run: func start
### ============================================================================
### ============================================================================
### 1. Start the Workflow with Content for Moderation
### ============================================================================
### This starts the workflow. The AI will analyze the content, then the workflow
### will pause waiting for human approval.
POST http://localhost:7071/api/workflow/run
Content-Type: application/json
{
"content_id": "article-001",
"title": "Introduction to AI in Healthcare",
"body": "Artificial intelligence is revolutionizing healthcare by enabling faster diagnosis, personalized treatment plans, and improved patient outcomes. Machine learning algorithms can analyze medical images with remarkable accuracy, often detecting issues that human radiologists might miss.",
"author": "Dr. Jane Smith"
}
### ============================================================================
### 2. Start Workflow with Potentially Problematic Content
### ============================================================================
### This content should trigger higher risk assessment from the AI analyzer.
POST http://localhost:7071/api/workflow/run
Content-Type: application/json
{
"content_id": "article-002",
"title": "Get Rich Quick Scheme",
"body": "Click here NOW to make $10,000 overnight! This SECRET method is GUARANTEED to work! Limited time offer - act NOW before it's too late! Send your bank details immediately!",
"author": "Definitely Not Spam"
}
### ============================================================================
### 3. Check Workflow Status
### ============================================================================
### Replace INSTANCE_ID with the value returned from the run call.
### The status will show pending HITL requests if waiting for human approval.
@instanceId = 3130c486c9374e4e87125cbd9a238dfc
GET http://localhost:7071/api/workflow/status/{{instanceId}}
### ============================================================================
### 4. Send Human Approval
### ============================================================================
### Approve the content for publication.
### Replace INSTANCE_ID and REQUEST_ID with values from the status response.
@requestId = 1682e5f8-0917-4b68-aa04-d4688cfa2e69
POST http://localhost:7071/api/workflow/respond/{{instanceId}}/{{requestId}}
Content-Type: application/json
{
"approved": true,
"reviewer_notes": "Content is appropriate and well-written. Approved for publication."
}
### ============================================================================
### 5. Send Human Rejection
### ============================================================================
### Reject the content with feedback.
POST http://localhost:7071/api/workflow/respond/{{instanceId}}/{{requestId}}
Content-Type: application/json
{
"approved": false,
"reviewer_notes": "Content appears to be spam. Contains multiple spam indicators including urgency language, promises of easy money, and requests for personal information."
}
### ============================================================================
### Example Workflow - Complete Happy Path
### ============================================================================
###
### Step 1: Start workflow with content
### POST http://localhost:7071/api/workflow/run
### -> Returns instanceId: "abc123..."
###
### Step 2: Check status (workflow is waiting for human input)
### GET http://localhost:7071/api/workflow/status/abc123
### -> Returns pendingHumanInputRequests with requestId: "req-456..."
###
### Step 3: Approve content
### POST http://localhost:7071/api/workflow/respond/abc123/req-456
### {
### "approved": true,
### "reviewer_notes": "Looks good!"
### }
### -> Returns success
###
### Step 4: Check final status
### GET http://localhost:7071/api/workflow/status/abc123
### -> Returns runtimeStatus: "Completed", output: "✅ Content approved..."
###
### ============================================================================
### ============================================================================
### Health Check
### ============================================================================
GET http://localhost:7071/api/health
@@ -0,0 +1,469 @@
# Copyright (c) Microsoft. All rights reserved.
"""Workflow with Human-in-the-Loop (HITL) using MAF request_info Pattern.
This sample demonstrates how to integrate human approval into a MAF workflow
running on Azure Durable Functions. It uses the MAF `request_info` and
`@response_handler` pattern for structured HITL interactions.
The workflow simulates a content moderation pipeline:
1. User submits content for publication
2. An AI agent analyzes the content for policy compliance
3. A human reviewer is prompted to approve/reject the content
4. Based on approval, content is either published or rejected
Key architectural points:
- Uses MAF's `ctx.request_info()` to pause workflow and request human input
- Uses `@response_handler` decorator to handle the human's response
- AgentFunctionApp automatically provides HITL endpoints for status and response
- Durable Functions provides durability while waiting for human input
Prerequisites:
- Azure OpenAI configured with required environment variables
- Durable Task Scheduler connection string
- Authentication via Azure CLI (az login)
"""
import json
import logging
import os
from dataclasses import dataclass
from typing import Any
from agent_framework import (
AgentExecutorRequest,
AgentExecutorResponse,
Message,
Executor,
Workflow,
WorkflowBuilder,
WorkflowContext,
handler,
response_handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework_azurefunctions import AgentFunctionApp
from azure.identity import AzureCliCredential
from pydantic import BaseModel, ValidationError
from typing_extensions import Never
logger = logging.getLogger(__name__)
# Environment variable names
AZURE_OPENAI_ENDPOINT_ENV = "AZURE_OPENAI_ENDPOINT"
AZURE_OPENAI_DEPLOYMENT_ENV = "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"
AZURE_OPENAI_API_KEY_ENV = "AZURE_OPENAI_API_KEY"
# Agent names
CONTENT_ANALYZER_AGENT_NAME = "ContentAnalyzerAgent"
# ============================================================================
# Data Models
# ============================================================================
class ContentAnalysisResult(BaseModel):
"""Structured output from the content analysis agent."""
is_appropriate: bool
risk_level: str # low, medium, high
concerns: list[str]
recommendation: str
@dataclass
class ContentSubmission:
"""Content submitted for moderation."""
content_id: str
title: str
body: str
author: str
@dataclass
class HumanApprovalRequest:
"""Request sent to human reviewer for approval.
This is the payload passed to ctx.request_info() and will be
exposed via the orchestration status for external systems to retrieve.
"""
content_id: str
title: str
body: str
author: str
ai_analysis: ContentAnalysisResult
prompt: str
class HumanApprovalResponse(BaseModel):
"""Response from human reviewer.
This is what the external system must send back via the HITL response endpoint.
"""
approved: bool
reviewer_notes: str = ""
@dataclass
class ModerationResult:
"""Final result of the moderation workflow."""
content_id: str
status: str # "approved", "rejected"
ai_analysis: ContentAnalysisResult | None
reviewer_notes: str
# ============================================================================
# Agent Instructions
# ============================================================================
CONTENT_ANALYZER_INSTRUCTIONS = """You are a content moderation assistant that analyzes user-submitted content
for policy compliance. Evaluate the content for:
1. Appropriateness - Is the content suitable for a general audience?
2. Risk level - Rate as 'low', 'medium', or 'high' based on potential issues
3. Concerns - List any specific issues found (empty list if none)
4. Recommendation - Provide a brief recommendation for human reviewers
Return a JSON response with:
- is_appropriate: boolean
- risk_level: string ('low', 'medium', 'high')
- concerns: list of strings
- recommendation: string
Be thorough but fair in your analysis."""
# ============================================================================
# Executors
# ============================================================================
@dataclass
class AnalysisWithSubmission:
"""Combines the AI analysis with the original submission for downstream processing."""
submission: ContentSubmission
analysis: ContentAnalysisResult
class ContentAnalyzerExecutor(Executor):
"""Parses the AI agent's response and prepares for human review."""
def __init__(self):
super().__init__(id="content_analyzer_executor")
@handler
async def handle_analysis(
self,
response: AgentExecutorResponse,
ctx: WorkflowContext[AnalysisWithSubmission],
) -> None:
"""Parse the AI analysis and forward with submission context."""
try:
analysis = ContentAnalysisResult.model_validate_json(response.agent_response.text)
except ValidationError:
analysis = ContentAnalysisResult(
is_appropriate=False,
risk_level="high",
concerns=["Agent execution failed or yielded invalid JSON (possible content filter)."],
recommendation="Manual review required",
)
# Retrieve the original submission from shared state
submission: ContentSubmission = ctx.get_state("current_submission")
await ctx.send_message(AnalysisWithSubmission(submission=submission, analysis=analysis))
class HumanReviewExecutor(Executor):
"""Requests human approval using MAF's request_info pattern.
This executor demonstrates the core HITL pattern:
1. Receives the AI analysis result
2. Calls ctx.request_info() to pause and request human input
3. The @response_handler method processes the human's response
"""
def __init__(self):
super().__init__(id="human_review_executor")
@handler
async def request_review(
self,
data: AnalysisWithSubmission,
ctx: WorkflowContext,
) -> None:
"""Request human review for the content.
This method:
1. Constructs the approval request with all context
2. Calls request_info to pause the workflow
3. The workflow will resume when a response is provided via the HITL endpoint
"""
submission = data.submission
analysis = data.analysis
# Construct the human-readable prompt
prompt = (
f"Please review the following content for publication:\n\n"
f"Title: {submission.title}\n"
f"Author: {submission.author}\n"
f"Content: {submission.body}\n\n"
f"AI Analysis:\n"
f"- Appropriate: {analysis.is_appropriate}\n"
f"- Risk Level: {analysis.risk_level}\n"
f"- Concerns: {', '.join(analysis.concerns) if analysis.concerns else 'None'}\n"
f"- Recommendation: {analysis.recommendation}\n\n"
f"Please approve or reject this content."
)
approval_request = HumanApprovalRequest(
content_id=submission.content_id,
title=submission.title,
body=submission.body,
author=submission.author,
ai_analysis=analysis,
prompt=prompt,
)
# Store analysis in shared state for the response handler
ctx.set_state("pending_analysis", data)
# Request human input - workflow will pause here
# The response_type specifies what we expect back
await ctx.request_info(
request_data=approval_request,
response_type=HumanApprovalResponse,
)
@response_handler
async def handle_approval_response(
self,
original_request: HumanApprovalRequest,
response: HumanApprovalResponse,
ctx: WorkflowContext[ModerationResult],
) -> None:
"""Process the human reviewer's decision.
This method is called automatically when a response to request_info is received.
The original_request contains the HumanApprovalRequest we sent.
The response contains the HumanApprovalResponse from the reviewer.
"""
logger.info(
"Human review received for content %s: approved=%s, notes=%s",
original_request.content_id,
response.approved,
response.reviewer_notes,
)
# Create the final moderation result
status = "approved" if response.approved else "rejected"
result = ModerationResult(
content_id=original_request.content_id,
status=status,
ai_analysis=original_request.ai_analysis,
reviewer_notes=response.reviewer_notes,
)
await ctx.send_message(result)
class PublishExecutor(Executor):
"""Handles the final publication or rejection of content."""
def __init__(self):
super().__init__(id="publish_executor")
@handler
async def handle_result(
self,
result: ModerationResult,
ctx: WorkflowContext[Never, str],
) -> None:
"""Finalize the moderation and yield output."""
if result.status == "approved":
message = (
f"✅ Content '{result.content_id}' has been APPROVED and published.\n"
f"Reviewer notes: {result.reviewer_notes or 'None'}"
)
else:
message = (
f"❌ Content '{result.content_id}' has been REJECTED.\n"
f"Reviewer notes: {result.reviewer_notes or 'None'}"
)
logger.info(message)
await ctx.yield_output(message)
# ============================================================================
# Input Router Executor
# ============================================================================
def _build_client_kwargs() -> dict[str, Any]:
"""Build Azure OpenAI client configuration from environment variables."""
endpoint = os.getenv(AZURE_OPENAI_ENDPOINT_ENV)
if not endpoint:
raise RuntimeError(f"{AZURE_OPENAI_ENDPOINT_ENV} environment variable is required.")
deployment = os.getenv(AZURE_OPENAI_DEPLOYMENT_ENV)
if not deployment:
raise RuntimeError(f"{AZURE_OPENAI_DEPLOYMENT_ENV} environment variable is required.")
client_kwargs: dict[str, Any] = {
"endpoint": endpoint,
"deployment_name": deployment,
}
api_key = os.getenv(AZURE_OPENAI_API_KEY_ENV)
if api_key:
client_kwargs["api_key"] = api_key
else:
client_kwargs["credential"] = AzureCliCredential()
return client_kwargs
class InputRouterExecutor(Executor):
"""Routes incoming content submission to the analysis agent."""
def __init__(self):
super().__init__(id="input_router")
@handler
async def route_input(
self,
input_json: str,
ctx: WorkflowContext[AgentExecutorRequest],
) -> None:
"""Parse input and create agent request."""
data = json.loads(input_json) if isinstance(input_json, str) else input_json
submission = ContentSubmission(
content_id=data.get("content_id", "unknown"),
title=data.get("title", "Untitled"),
body=data.get("body", ""),
author=data.get("author", "Anonymous"),
)
# Store submission in shared state for later retrieval
ctx.set_state("current_submission", submission)
# Create the agent request
message = (
f"Please analyze the following content for policy compliance:\n\n"
f"Title: {submission.title}\n"
f"Author: {submission.author}\n"
f"Content:\n{submission.body}"
)
await ctx.send_message(
AgentExecutorRequest(
messages=[Message(role="user", text=message)],
should_respond=True,
)
)
# ============================================================================
# Workflow Creation
# ============================================================================
def _create_workflow() -> Workflow:
"""Create the content moderation workflow with HITL."""
client_kwargs = _build_client_kwargs()
chat_client = AzureOpenAIChatClient(**client_kwargs)
# Create the content analysis agent
content_analyzer_agent = chat_client.as_agent(
name=CONTENT_ANALYZER_AGENT_NAME,
instructions=CONTENT_ANALYZER_INSTRUCTIONS,
default_options={"response_format": ContentAnalysisResult},
)
# Create executors
input_router = InputRouterExecutor()
content_analyzer_executor = ContentAnalyzerExecutor()
human_review_executor = HumanReviewExecutor()
publish_executor = PublishExecutor()
# Build the workflow graph
# Flow:
# input_router -> content_analyzer_agent -> content_analyzer_executor
# -> human_review_executor (HITL pause here) -> publish_executor
return (
WorkflowBuilder(start_executor=input_router)
.add_edge(input_router, content_analyzer_agent)
.add_edge(content_analyzer_agent, content_analyzer_executor)
.add_edge(content_analyzer_executor, human_review_executor)
.add_edge(human_review_executor, publish_executor)
.build()
)
# ============================================================================
# Application Entry Point
# ============================================================================
def launch(durable: bool = True) -> AgentFunctionApp | None:
"""Launch the function app or DevUI.
Args:
durable: If True, returns AgentFunctionApp for Azure Functions.
If False, launches DevUI for local MAF development.
"""
if durable:
# Azure Functions mode with Durable Functions
# The app automatically provides HITL endpoints:
# - POST /api/workflow/run - Start the workflow
# - GET /api/workflow/status/{instanceId} - Check status and pending HITL requests
# - POST /api/workflow/respond/{instanceId}/{requestId} - Send HITL response
# - GET /api/health - Health check
workflow = _create_workflow()
return AgentFunctionApp(workflow=workflow, enable_health_check=True)
# Pure MAF mode with DevUI for local development
from pathlib import Path
from agent_framework.devui import serve
from dotenv import load_dotenv
env_path = Path(__file__).parent / ".env"
load_dotenv(dotenv_path=env_path)
logger.info("Starting Workflow HITL Sample in MAF mode")
logger.info("Available at: http://localhost:8096")
logger.info("\nThis workflow demonstrates:")
logger.info("- Human-in-the-loop using request_info / @response_handler pattern")
logger.info("- AI content analysis with structured output")
logger.info("- Human approval workflow integration")
logger.info("\nFlow: InputRouter -> ContentAnalyzer Agent -> HumanReview -> Publish")
workflow = _create_workflow()
serve(entities=[workflow], port=8096, auto_open=True)
return None
# Default: Azure Functions mode
# Run with `python function_app.py --maf` for pure MAF mode with DevUI
app = launch(durable=True)
if __name__ == "__main__":
import sys
if "--maf" in sys.argv:
# Run in pure MAF mode with DevUI
launch(durable=False)
else:
print("Usage: python function_app.py --maf")
print(" --maf Run in pure MAF mode with DevUI (http://localhost:8096)")
print("\nFor Azure Functions mode, use: func start")
@@ -0,0 +1,12 @@
{
"version": "2.0",
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
},
"extensions": {
"durableTask": {
"hubName": "%TASKHUB_NAME%"
}
}
}
@@ -0,0 +1,11 @@
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None",
"TASKHUB_NAME": "default",
"FUNCTIONS_WORKER_RUNTIME": "python",
"AZURE_OPENAI_ENDPOINT": "<Your Azure OpenAI endpoint>",
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "<Your Azure OpenAI chat deployment name>"
}
}
@@ -0,0 +1,3 @@
agent-framework-azurefunctions
azure-identity
agents-maf