Files
Copilot b05fc9e849 Python: Add load_dotenv() to samples for .env file support (#4043)
* Initial plan

* Add load_dotenv() to 303 Python samples for environment variable loading

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Update SAMPLE_GUIDELINES.md to document load_dotenv() requirement

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Update samples README.md to document .env file usage

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Run ruff format on all changed sample files

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Clarify load_dotenv() usage in README - local dev vs production

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove deprecated getting_started folder as requested

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Document env_file_path parameter for per-client configuration

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Merge main branch to resolve conflicts

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Fix run_evaluation.py file that was empty in merge commit

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove dotnet changes from merge - out of scope for this PR

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove package and test changes from merge - only sample changes needed

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove test_func_utils.py - only sample changes needed

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Revert sample files not in original changeset - keep only load_dotenv additions

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Move load_dotenv() outside snippet tag in 06_host_your_agent.py

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Fix comment placement - move load_dotenv before code comments

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Fix load_dotenv() placement across all samples - after docstring, before code comments

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Merge latest main branch with load_dotenv changes

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove non-sample changes from merge - keep only load_dotenv additions

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Revert non-load_dotenv sample changes from merge

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Fix run_evaluation.py - use main's improved version (file already had load_dotenv)

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Manual update

* Manual update 2

* Fix Role usage and load_dotenv placement per PR review feedback

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Fix Role usage - use string literals not enum attributes

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Fix SAMPLE_GUIDELINES.md example - load_dotenv before docstring per guidance

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Move load_dotenv() before docstrings in all samples per SAMPLE_GUIDELINES ordering

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Address PR review: rename files, fix placement, add session usage, remove note

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Update Redis README to reference renamed file redis_history_provider.py

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>
Co-authored-by: Tao Chen <taochen@microsoft.com>
Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>
Co-authored-by: Eduard van Valkenburg <eavanvalkenburg@users.noreply.github.com>
2026-02-19 10:55:13 +00:00

247 lines
9.2 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from uuid import uuid4
from agent_framework import (
Agent,
AgentExecutorRequest,
AgentExecutorResponse,
Message,
WorkflowBuilder,
WorkflowContext,
executor,
)
from agent_framework.azure import AzureOpenAIResponsesClient
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
from pydantic import BaseModel
from typing_extensions import Never
# Load environment variables from .env file
load_dotenv()
"""
Sample: Workflow state with agents and conditional routing.
Store an email once by id, classify it with a detector agent, then either draft a reply with an assistant
agent or finish with a spam notice. Stream events as the workflow runs.
Purpose:
Show how to:
- Use workflow state to decouple large payloads from messages and pass around lightweight references.
- Enforce structured agent outputs with Pydantic models via response_format for robust parsing.
- Route using conditional edges based on a typed intermediate DetectionResult.
- Compose agent backed executors with function style executors and yield the final output when the workflow completes.
Prerequisites:
- AZURE_AI_PROJECT_ENDPOINT must be your Azure AI Foundry Agent Service (V2) project endpoint.
- Azure OpenAI configured for AzureOpenAIResponsesClient with required environment variables.
- Authentication via azure-identity. Use AzureCliCredential and run az login before executing the sample.
- Familiarity with WorkflowBuilder, executors, conditional edges, and streaming runs.
"""
EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"
class DetectionResultAgent(BaseModel):
"""Structured output returned by the spam detection agent."""
is_spam: bool
reason: str
class EmailResponse(BaseModel):
"""Structured output returned by the email assistant agent."""
response: str
@dataclass
class DetectionResult:
"""Internal detection result enriched with the state email_id for later lookups."""
is_spam: bool
reason: str
email_id: str
@dataclass
class Email:
"""In memory record stored in state to avoid re-sending large bodies on edges."""
email_id: str
email_content: str
def get_condition(expected_result: bool):
"""Create a condition predicate for DetectionResult.is_spam.
Contract:
- If the message is not a DetectionResult, allow it to pass to avoid accidental dead ends.
- Otherwise, return True only when is_spam matches expected_result.
"""
def condition(message: Any) -> bool:
if not isinstance(message, DetectionResult):
return True
return message.is_spam == expected_result
return condition
@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Persist the raw email content in state and trigger spam detection.
Responsibilities:
- Generate a unique email_id (UUID) for downstream retrieval.
- Store the Email object under a namespaced key and set the current id pointer.
- Emit an AgentExecutorRequest asking the detector to respond.
"""
new_email = Email(email_id=str(uuid4()), email_content=email_text)
ctx.set_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
ctx.set_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)
await ctx.send_message(
AgentExecutorRequest(messages=[Message("user", text=new_email.email_content)], should_respond=True)
)
@executor(id="to_detection_result")
async def to_detection_result(response: AgentExecutorResponse, ctx: WorkflowContext[DetectionResult]) -> None:
"""Parse spam detection JSON into a structured model and enrich with email_id.
Steps:
1) Validate the agent's JSON output into DetectionResultAgent.
2) Retrieve the current email_id from workflow state.
3) Send a typed DetectionResult for conditional routing.
"""
parsed = DetectionResultAgent.model_validate_json(response.agent_response.text)
email_id: str = ctx.get_state(CURRENT_EMAIL_ID_KEY)
await ctx.send_message(DetectionResult(is_spam=parsed.is_spam, reason=parsed.reason, email_id=email_id))
@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(detection: DetectionResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Forward non spam email content to the drafting agent.
Guard:
- This path should only receive non spam. Raise if misrouted.
"""
if detection.is_spam:
raise RuntimeError("This executor should only handle non-spam messages.")
# Load the original content by id from workflow state and forward it to the assistant.
email: Email = ctx.get_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
await ctx.send_message(
AgentExecutorRequest(messages=[Message("user", text=email.email_content)], should_respond=True)
)
@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
"""Validate the drafted reply and yield the final output."""
parsed = EmailResponse.model_validate_json(response.agent_response.text)
await ctx.yield_output(f"Email sent: {parsed.response}")
@executor(id="handle_spam")
async def handle_spam(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
"""Yield output describing why the email was marked as spam."""
if detection.is_spam:
await ctx.yield_output(f"Email marked as spam: {detection.reason}")
else:
raise RuntimeError("This executor should only handle spam messages.")
def create_spam_detection_agent() -> Agent:
"""Creates a spam detection agent."""
return AzureOpenAIResponsesClient(
project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
).as_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields is_spam (bool) and reason (string)."
),
default_options={"response_format": DetectionResultAgent},
# response_format enforces structured JSON from each agent.
name="spam_detection_agent",
)
def create_email_assistant_agent() -> Agent:
"""Creates an email assistant agent."""
return AzureOpenAIResponsesClient(
project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
).as_agent(
instructions=(
"You are an email assistant that helps users draft responses to emails with professionalism. "
"Return JSON with a single field 'response' containing the drafted reply."
),
# response_format enforces structured JSON from each agent.
default_options={"response_format": EmailResponse},
name="email_assistant_agent",
)
async def main() -> None:
"""Build and run the workflow state with agents and conditional routing workflow."""
# Build the workflow graph with conditional edges.
# Flow:
# store_email -> spam_detection_agent -> to_detection_result -> branch:
# False -> submit_to_email_assistant -> email_assistant_agent -> finalize_and_send
# True -> handle_spam
spam_detection_agent = create_spam_detection_agent()
email_assistant_agent = create_email_assistant_agent()
workflow = (
WorkflowBuilder(start_executor=store_email)
.add_edge(store_email, spam_detection_agent)
.add_edge(spam_detection_agent, to_detection_result)
.add_edge(to_detection_result, submit_to_email_assistant, condition=get_condition(False))
.add_edge(to_detection_result, handle_spam, condition=get_condition(True))
.add_edge(submit_to_email_assistant, email_assistant_agent)
.add_edge(email_assistant_agent, finalize_and_send)
.build()
)
# Read an email from resources/spam.txt if available; otherwise use a default sample.
current_file = Path(__file__)
resources_path = current_file.parent.parent / "resources" / "spam.txt"
if resources_path.exists():
email = resources_path.read_text(encoding="utf-8")
else:
print("Unable to find resource file, using default text.")
email = "You are a WINNER! Click here for a free lottery offer!!!"
# Run and print the final result. Streaming surfaces intermediate execution events as well.
events = await workflow.run(email)
outputs = events.get_outputs()
if outputs:
print(f"Final result: {outputs[0]}")
"""
Sample Output:
Final result: Email marked as spam: This email exhibits several common spam and scam characteristics:
unrealistic claims of large cash winnings, urgent time pressure, requests for sensitive personal and financial
information, and a demand for a processing fee. The sender impersonates a generic lottery commission, and the
message contains a suspicious link. All these are typical of phishing and lottery scam emails.
"""
if __name__ == "__main__":
asyncio.run(main())