# Copyright (c) Microsoft. All rights reserved. import asyncio import os from dataclasses import dataclass from agent_framework import ( Agent, AgentExecutor, AgentExecutorRequest, AgentExecutorResponse, Executor, Message, WorkflowBuilder, WorkflowContext, WorkflowViz, handler, ) from agent_framework.foundry import FoundryChatClient from azure.identity import AzureCliCredential from dotenv import load_dotenv from typing_extensions import Never # Load environment variables from .env file load_dotenv() """ Sample: Concurrent (Fan-out/Fan-in) with Agents + Visualization What it does: - Fan-out: dispatch the same prompt to multiple domain agents (research, marketing, legal). - Fan-in: aggregate their responses into one consolidated output. - Visualization: generate Mermaid and GraphViz representations via `WorkflowViz` and optionally export SVG. Prerequisites: - FOUNDRY_PROJECT_ENDPOINT must be your Azure AI Foundry Agent Service (V2) project endpoint. - FOUNDRY_MODEL must be set to your Azure OpenAI model deployment name. - Authentication via `azure-identity` — uses `AzureCliCredential()` (run `az login`). - For visualization export: `pip install graphviz>=0.20.0` and install GraphViz binaries. """ class DispatchToExperts(Executor): """Dispatches the incoming prompt to all expert agent executors (fan-out).""" @handler async def dispatch(self, prompt: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None: # Wrap the incoming prompt as a user message for each expert and request a response. initial_message = Message("user", contents=[prompt]) await ctx.send_message(AgentExecutorRequest(messages=[initial_message], should_respond=True)) @dataclass class AggregatedInsights: """Structured output from the aggregator.""" research: str marketing: str legal: str class AggregateInsights(Executor): """Aggregates expert agent responses into a single consolidated result (fan-in).""" @handler async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None: # Map responses to text by executor id for a simple, predictable demo. by_id: dict[str, str] = {} for r in results: # AgentExecutorResponse.agent_response.text contains concatenated assistant text by_id[r.executor_id] = r.agent_response.text research_text = by_id.get("researcher", "") marketing_text = by_id.get("marketer", "") legal_text = by_id.get("legal", "") aggregated = AggregatedInsights( research=research_text, marketing=marketing_text, legal=legal_text, ) # Provide a readable, consolidated string as the final workflow result. consolidated = ( "Consolidated Insights\n" "====================\n\n" f"Research Findings:\n{aggregated.research}\n\n" f"Marketing Angle:\n{aggregated.marketing}\n\n" f"Legal/Compliance Notes:\n{aggregated.legal}\n" ) await ctx.yield_output(consolidated) async def main() -> None: """Build and run the concurrent workflow with visualization.""" # Create agent instances researcher = AgentExecutor( Agent( client=FoundryChatClient( project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"], model=os.environ["FOUNDRY_MODEL"], credential=AzureCliCredential(), ), instructions=( "You're an expert market and product researcher. Given a prompt, provide concise, factual insights," " opportunities, and risks." ), name="researcher", ) ) marketer = AgentExecutor( Agent( client=FoundryChatClient( project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"], model=os.environ["FOUNDRY_MODEL"], credential=AzureCliCredential(), ), instructions=( "You're a creative marketing strategist. Craft compelling value propositions and target messaging" " aligned to the prompt." ), name="marketer", ) ) legal = AgentExecutor( Agent( client=FoundryChatClient( project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"], model=os.environ["FOUNDRY_MODEL"], credential=AzureCliCredential(), ), instructions=( "You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns" " based on the prompt." ), name="legal", ) ) # Create executor instances dispatcher = DispatchToExperts(id="dispatcher") aggregator = AggregateInsights(id="aggregator") # Build a simple fan-out/fan-in workflow workflow = ( WorkflowBuilder(start_executor=dispatcher) .add_fan_out_edges(dispatcher, [researcher, marketer, legal]) .add_fan_in_edges([researcher, marketer, legal], aggregator) .build() ) # Generate workflow visualization print("Generating workflow visualization...") viz = WorkflowViz(workflow) # Print out the mermaid string. print("Mermaid string: \n=======") print(viz.to_mermaid()) print("=======") # Print out the DiGraph string with internal executors. print("DiGraph string: \n=======") print(viz.to_digraph(include_internal_executors=True)) print("=======") # Export the DiGraph visualization as SVG. svg_file = viz.export(format="svg") print(f"SVG file saved to: {svg_file}") if __name__ == "__main__": asyncio.run(main())