mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
3446eb8d5d
* updates to final deprecated pieces and versions * fix mypy * fix readme links
175 lines
5.7 KiB
Python
175 lines
5.7 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
import asyncio
|
|
import os
|
|
from dataclasses import dataclass
|
|
|
|
from agent_framework import (
|
|
Agent,
|
|
AgentExecutor,
|
|
AgentExecutorRequest,
|
|
AgentExecutorResponse,
|
|
Executor,
|
|
Message,
|
|
WorkflowBuilder,
|
|
WorkflowContext,
|
|
WorkflowViz,
|
|
handler,
|
|
)
|
|
from agent_framework.foundry import FoundryChatClient
|
|
from azure.identity import AzureCliCredential
|
|
from dotenv import load_dotenv
|
|
from typing_extensions import Never
|
|
|
|
# Load environment variables from .env file
|
|
load_dotenv()
|
|
|
|
"""
|
|
Sample: Concurrent (Fan-out/Fan-in) with Agents + Visualization
|
|
|
|
What it does:
|
|
- Fan-out: dispatch the same prompt to multiple domain agents (research, marketing, legal).
|
|
- Fan-in: aggregate their responses into one consolidated output.
|
|
- Visualization: generate Mermaid and GraphViz representations via `WorkflowViz` and optionally export SVG.
|
|
|
|
Prerequisites:
|
|
- FOUNDRY_PROJECT_ENDPOINT must be your Azure AI Foundry Agent Service (V2) project endpoint.
|
|
- FOUNDRY_MODEL must be set to your Azure OpenAI model deployment name.
|
|
- Authentication via `azure-identity` — uses `AzureCliCredential()` (run `az login`).
|
|
- For visualization export: `pip install graphviz>=0.20.0` and install GraphViz binaries.
|
|
"""
|
|
|
|
|
|
class DispatchToExperts(Executor):
|
|
"""Dispatches the incoming prompt to all expert agent executors (fan-out)."""
|
|
|
|
@handler
|
|
async def dispatch(self, prompt: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
|
|
# Wrap the incoming prompt as a user message for each expert and request a response.
|
|
initial_message = Message("user", contents=[prompt])
|
|
await ctx.send_message(AgentExecutorRequest(messages=[initial_message], should_respond=True))
|
|
|
|
|
|
@dataclass
|
|
class AggregatedInsights:
|
|
"""Structured output from the aggregator."""
|
|
|
|
research: str
|
|
marketing: str
|
|
legal: str
|
|
|
|
|
|
class AggregateInsights(Executor):
|
|
"""Aggregates expert agent responses into a single consolidated result (fan-in)."""
|
|
|
|
@handler
|
|
async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None:
|
|
# Map responses to text by executor id for a simple, predictable demo.
|
|
by_id: dict[str, str] = {}
|
|
for r in results:
|
|
# AgentExecutorResponse.agent_response.text contains concatenated assistant text
|
|
by_id[r.executor_id] = r.agent_response.text
|
|
|
|
research_text = by_id.get("researcher", "")
|
|
marketing_text = by_id.get("marketer", "")
|
|
legal_text = by_id.get("legal", "")
|
|
|
|
aggregated = AggregatedInsights(
|
|
research=research_text,
|
|
marketing=marketing_text,
|
|
legal=legal_text,
|
|
)
|
|
|
|
# Provide a readable, consolidated string as the final workflow result.
|
|
consolidated = (
|
|
"Consolidated Insights\n"
|
|
"====================\n\n"
|
|
f"Research Findings:\n{aggregated.research}\n\n"
|
|
f"Marketing Angle:\n{aggregated.marketing}\n\n"
|
|
f"Legal/Compliance Notes:\n{aggregated.legal}\n"
|
|
)
|
|
|
|
await ctx.yield_output(consolidated)
|
|
|
|
|
|
async def main() -> None:
|
|
"""Build and run the concurrent workflow with visualization."""
|
|
|
|
# Create agent instances
|
|
researcher = AgentExecutor(
|
|
Agent(
|
|
client=FoundryChatClient(
|
|
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
|
|
model=os.environ["FOUNDRY_MODEL"],
|
|
credential=AzureCliCredential(),
|
|
),
|
|
instructions=(
|
|
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
|
|
" opportunities, and risks."
|
|
),
|
|
name="researcher",
|
|
)
|
|
)
|
|
|
|
marketer = AgentExecutor(
|
|
Agent(
|
|
client=FoundryChatClient(
|
|
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
|
|
model=os.environ["FOUNDRY_MODEL"],
|
|
credential=AzureCliCredential(),
|
|
),
|
|
instructions=(
|
|
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
|
|
" aligned to the prompt."
|
|
),
|
|
name="marketer",
|
|
)
|
|
)
|
|
|
|
legal = AgentExecutor(
|
|
Agent(
|
|
client=FoundryChatClient(
|
|
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
|
|
model=os.environ["FOUNDRY_MODEL"],
|
|
credential=AzureCliCredential(),
|
|
),
|
|
instructions=(
|
|
"You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
|
|
" based on the prompt."
|
|
),
|
|
name="legal",
|
|
)
|
|
)
|
|
|
|
# Create executor instances
|
|
dispatcher = DispatchToExperts(id="dispatcher")
|
|
aggregator = AggregateInsights(id="aggregator")
|
|
|
|
# Build a simple fan-out/fan-in workflow
|
|
workflow = (
|
|
WorkflowBuilder(start_executor=dispatcher)
|
|
.add_fan_out_edges(dispatcher, [researcher, marketer, legal])
|
|
.add_fan_in_edges([researcher, marketer, legal], aggregator)
|
|
.build()
|
|
)
|
|
|
|
# Generate workflow visualization
|
|
print("Generating workflow visualization...")
|
|
viz = WorkflowViz(workflow)
|
|
# Print out the mermaid string.
|
|
print("Mermaid string: \n=======")
|
|
print(viz.to_mermaid())
|
|
print("=======")
|
|
# Print out the DiGraph string with internal executors.
|
|
print("DiGraph string: \n=======")
|
|
print(viz.to_digraph(include_internal_executors=True))
|
|
print("=======")
|
|
|
|
# Export the DiGraph visualization as SVG.
|
|
svg_file = viz.export(format="svg")
|
|
print(f"SVG file saved to: {svg_file}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|