Files
agent-framework/python/samples/03-workflows/state-management/state_with_agents.py
T
Eduard van Valkenburg a2856d3b92 Python: restructure: Python samples into progressive 01-05 layout (#3862)
* restructure: Python samples into progressive 01-05 layout

- 01-get-started/: 6 numbered steps (hello agent → hosting)
- 02-agents/: all agent concept samples (tools, middleware, providers, etc.)
- 03-workflows/: ALL existing workflow samples preserved as-is
- 04-hosting/: azure-functions, durabletask, a2a
- 05-end-to-end/: demos, evaluation, hosted agents
- Old files moved to _to_delete/ for review
- Added AGENTS.md with structure documentation
- autogen-migration/ and semantic-kernel-migration/ preserved at root

* fix: switch to AzureOpenAI Foundry, fix CI failures

- Switch all 01-get-started samples to AzureOpenAIResponsesClient with
  Azure AI Foundry project endpoint (AZURE_AI_PROJECT_ENDPOINT +
  AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME + AzureCliCredential)
- Add _to_delete/ and 05-end-to-end/ to pyrightconfig.samples.json excludes
- Fix test paths in packages/ that referenced old getting_started/ dirs:
  durabletask conftest + streaming test, azurefunctions conftest,
  devui conftest + capture_messages + openai_sdk_integration
- Fix workflow_as_agent_human_in_the_loop.py import (sibling import)
- Update hosting READMEs and tool comment paths
- Replace root README.md with new structure overview
- Update AGENTS.md to document Azure OpenAI Foundry as default provider

* cleanup: remove _to_delete folder, copy resource files to active dirs

All files in _to_delete/ were either:
- Exact duplicates of files in the new structure (240 files)
- Same file with only comment path updates (100 files)
- One import-fix diff (workflow_as_agent_human_in_the_loop.py)
- One superseded minimal_sample.py

Resource files (sample.pdf, countries.json, employees.pdf, weather.json)
copied to 02-agents/sample_assets/ and 02-agents/resources/ since active
samples reference them.

* fix: address PR review comments, centralize resources, remove root duplicates

- Fix type annotation in 04_memory.py (string union -> proper types)
- Fix old sample paths in observability files
- Fix grammar/spelling in observability samples
- Move sample_assets/ and resources/ to shared/ folder
- Remove 8 duplicate observability files from 02-agents root
- Update resource path references in multimodal_input and provider samples

* fix: update broken links from old getting_started paths to new structure

- Update relative paths in READMEs: getting_started/ → 01-get-started/,
  02-agents/, 03-workflows/, 04-hosting/, 05-end-to-end/
- Fix absolute GitHub URLs in package READMEs
- Fix broken link in ollama package README

* fix: convert absolute GitHub URLs to relative paths for link checker

Absolute URLs to python/samples/ on main branch 404 until PR merges.
Converted to relative paths that linkspector can verify locally.

* fix: update link for handoff sample moved to orchestrations/

* fix: update chatkit-integration README path from demos/ to 05-end-to-end/

* fix: update broken links in orchestrations README to match flat directory structure
2026-02-12 17:36:36 +00:00

243 lines
9.1 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from uuid import uuid4
from agent_framework import (
Agent,
AgentExecutorRequest,
AgentExecutorResponse,
Message,
WorkflowBuilder,
WorkflowContext,
executor,
)
from agent_framework.azure import AzureOpenAIResponsesClient
from azure.identity import AzureCliCredential
from pydantic import BaseModel
from typing_extensions import Never
"""
Sample: Workflow state with agents and conditional routing.
Store an email once by id, classify it with a detector agent, then either draft a reply with an assistant
agent or finish with a spam notice. Stream events as the workflow runs.
Purpose:
Show how to:
- Use workflow state to decouple large payloads from messages and pass around lightweight references.
- Enforce structured agent outputs with Pydantic models via response_format for robust parsing.
- Route using conditional edges based on a typed intermediate DetectionResult.
- Compose agent backed executors with function style executors and yield the final output when the workflow completes.
Prerequisites:
- AZURE_AI_PROJECT_ENDPOINT must be your Azure AI Foundry Agent Service (V2) project endpoint.
- Azure OpenAI configured for AzureOpenAIResponsesClient with required environment variables.
- Authentication via azure-identity. Use AzureCliCredential and run az login before executing the sample.
- Familiarity with WorkflowBuilder, executors, conditional edges, and streaming runs.
"""
EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"
class DetectionResultAgent(BaseModel):
"""Structured output returned by the spam detection agent."""
is_spam: bool
reason: str
class EmailResponse(BaseModel):
"""Structured output returned by the email assistant agent."""
response: str
@dataclass
class DetectionResult:
"""Internal detection result enriched with the state email_id for later lookups."""
is_spam: bool
reason: str
email_id: str
@dataclass
class Email:
"""In memory record stored in state to avoid re-sending large bodies on edges."""
email_id: str
email_content: str
def get_condition(expected_result: bool):
"""Create a condition predicate for DetectionResult.is_spam.
Contract:
- If the message is not a DetectionResult, allow it to pass to avoid accidental dead ends.
- Otherwise, return True only when is_spam matches expected_result.
"""
def condition(message: Any) -> bool:
if not isinstance(message, DetectionResult):
return True
return message.is_spam == expected_result
return condition
@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Persist the raw email content in state and trigger spam detection.
Responsibilities:
- Generate a unique email_id (UUID) for downstream retrieval.
- Store the Email object under a namespaced key and set the current id pointer.
- Emit an AgentExecutorRequest asking the detector to respond.
"""
new_email = Email(email_id=str(uuid4()), email_content=email_text)
ctx.set_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
ctx.set_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)
await ctx.send_message(
AgentExecutorRequest(messages=[Message("user", text=new_email.email_content)], should_respond=True)
)
@executor(id="to_detection_result")
async def to_detection_result(response: AgentExecutorResponse, ctx: WorkflowContext[DetectionResult]) -> None:
"""Parse spam detection JSON into a structured model and enrich with email_id.
Steps:
1) Validate the agent's JSON output into DetectionResultAgent.
2) Retrieve the current email_id from workflow state.
3) Send a typed DetectionResult for conditional routing.
"""
parsed = DetectionResultAgent.model_validate_json(response.agent_response.text)
email_id: str = ctx.get_state(CURRENT_EMAIL_ID_KEY)
await ctx.send_message(DetectionResult(is_spam=parsed.is_spam, reason=parsed.reason, email_id=email_id))
@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(detection: DetectionResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Forward non spam email content to the drafting agent.
Guard:
- This path should only receive non spam. Raise if misrouted.
"""
if detection.is_spam:
raise RuntimeError("This executor should only handle non-spam messages.")
# Load the original content by id from workflow state and forward it to the assistant.
email: Email = ctx.get_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[Message("user", text=email.email_content)], should_respond=True)
)
@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Validate the drafted reply and yield the final output."""
parsed = EmailResponse.model_validate_json(response.agent_response.text)
await ctx.yield_output(f"Email sent: {parsed.response}")
@executor(id="handle_spam")
async def handle_spam(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
"""Yield output describing why the email was marked as spam."""
if detection.is_spam:
await ctx.yield_output(f"Email marked as spam: {detection.reason}")
else:
raise RuntimeError("This executor should only handle spam messages.")
def create_spam_detection_agent() -> Agent:
"""Creates a spam detection agent."""
return AzureOpenAIResponsesClient(
project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
).as_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields is_spam (bool) and reason (string)."
),
default_options={"response_format": DetectionResultAgent},
# response_format enforces structured JSON from each agent.
name="spam_detection_agent",
)
def create_email_assistant_agent() -> Agent:
"""Creates an email assistant agent."""
return AzureOpenAIResponsesClient(
project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
).as_agent(
instructions=(
"You are an email assistant that helps users draft responses to emails with professionalism. "
"Return JSON with a single field 'response' containing the drafted reply."
),
# response_format enforces structured JSON from each agent.
default_options={"response_format": EmailResponse},
name="email_assistant_agent",
)
async def main() -> None:
"""Build and run the workflow state with agents and conditional routing workflow."""
# Build the workflow graph with conditional edges.
# Flow:
# store_email -> spam_detection_agent -> to_detection_result -> branch:
# False -> submit_to_email_assistant -> email_assistant_agent -> finalize_and_send
# True -> handle_spam
spam_detection_agent = create_spam_detection_agent()
email_assistant_agent = create_email_assistant_agent()
workflow = (
WorkflowBuilder(start_executor=store_email)
.add_edge(store_email, spam_detection_agent)
.add_edge(spam_detection_agent, to_detection_result)
.add_edge(to_detection_result, submit_to_email_assistant, condition=get_condition(False))
.add_edge(to_detection_result, handle_spam, condition=get_condition(True))
.add_edge(submit_to_email_assistant, email_assistant_agent)
.add_edge(email_assistant_agent, finalize_and_send)
.build()
)
# Read an email from resources/spam.txt if available; otherwise use a default sample.
current_file = Path(__file__)
resources_path = current_file.parent.parent / "resources" / "spam.txt"
if resources_path.exists():
email = resources_path.read_text(encoding="utf-8")
else:
print("Unable to find resource file, using default text.")
email = "You are a WINNER! Click here for a free lottery offer!!!"
# Run and print the final result. Streaming surfaces intermediate execution events as well.
events = await workflow.run(email)
outputs = events.get_outputs()
if outputs:
print(f"Final result: {outputs[0]}")
"""
Sample Output:
Final result: Email marked as spam: This email exhibits several common spam and scam characteristics:
unrealistic claims of large cash winnings, urgent time pressure, requests for sensitive personal and financial
information, and a demand for a processing fee. The sender impersonates a generic lottery commission, and the
message contains a suspicious link. All these are typical of phishing and lottery scam emails.
"""
if __name__ == "__main__":
asyncio.run(main())