Files
agent-framework/python/samples/03-workflows/visualization/concurrent_with_visualization.py
T
Eduard van Valkenburg 5e056b672e Python: [BREAKING] Python: Provider-leading client design & OpenAI package extraction (#4818)
* Python: Provider-leading client design & OpenAI package extraction

Major refactoring of the Python Agent Framework client architecture:

- Extract OpenAI clients into new `agent-framework-openai` package
- Core package no longer depends on openai, azure-identity, azure-ai-projects
- Rename clients for discoverability: OpenAIResponsesClient → OpenAIChatClient,
  OpenAIChatClient → OpenAIChatCompletionClient
- Unify `model_id`/`deployment_name`/`model_deployment_name` → `model` param
- New FoundryChatClient for Azure AI Foundry Responses API
- New FoundryAgent/FoundryAgentClient for connecting to pre-configured Foundry agents
- Remove OpenAIBase/OpenAIConfigMixin from non-deprecated client MRO
- Deprecate AzureOpenAI* clients, AzureAIClient, OpenAIAssistantsClient
- Reorganize samples: azure_openai+azure_ai+azure_ai_agent → azure/
- ADR-0020: Provider-Leading Client Design

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix: missing Agent imports in samples, .model_id → .model in foundry_local sample

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix: CI failures — mypy errors, coverage targets, sample imports

- azure-ai mypy: add type ignores for TypedDict total=, model arg, forward ref
- Coverage: replace core.azure/openai targets with openai package target
- project_provider: add type annotation for opts dict

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix: populate openai .pyi stub, fix broken README links, coverage targets

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fixes

* updated observabilitty

* reset azure init.pyi

* fix errors

* updated adr number

* fix foundry local

* fixed not renamed docstrings and comments, and added deprecated markers to old classes

* fix tests and pyprojects

* fix test vars

* updated function tests

* update durable

* updated test setup for functions

* Fix Foundry auth in workflow samples

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Stabilize Python integration workflows

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Update hosting samples for Foundry

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Trigger full CI rerun

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Trigger CI rerun again

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* trigger rerun

* trigger rerun

* fix for litellm

* undo durabletask changes

* Move Foundry APIs into foundry namespace

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix Foundry pyproject formatting

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Split provider samples by Foundry surface

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Restore hosting sample requirements

Also fix the Foundry Local sample link after the provider sample move.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* updated tests

* udpated foundry integration tests

* removed dist from azurefunctions tests

* Use separate Foundry clients for concurrent agents

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix client setup in azfunc and durable

* disabled two tests

* updated setup for some function and durable tests

* improved azure openai setup with new clients

* ignore deprecated

* fixes

* skip 11

* remove openai assistants int tests

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-25 09:56:29 +00:00

175 lines
5.7 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
from dataclasses import dataclass
from agent_framework import (
Agent,
AgentExecutor,
AgentExecutorRequest,
AgentExecutorResponse,
Executor,
Message,
WorkflowBuilder,
WorkflowContext,
WorkflowViz,
handler,
)
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
from typing_extensions import Never
# Load environment variables from .env file
load_dotenv()
"""
Sample: Concurrent (Fan-out/Fan-in) with Agents + Visualization
What it does:
- Fan-out: dispatch the same prompt to multiple domain agents (research, marketing, legal).
- Fan-in: aggregate their responses into one consolidated output.
- Visualization: generate Mermaid and GraphViz representations via `WorkflowViz` and optionally export SVG.
Prerequisites:
- FOUNDRY_PROJECT_ENDPOINT must be your Azure AI Foundry Agent Service (V2) project endpoint.
- Azure AI/ Azure OpenAI for `FoundryChatClient` agents.
- Authentication via `azure-identity` — uses `AzureCliCredential()` (run `az login`).
- For visualization export: `pip install graphviz>=0.20.0` and install GraphViz binaries.
"""
class DispatchToExperts(Executor):
"""Dispatches the incoming prompt to all expert agent executors (fan-out)."""
@handler
async def dispatch(self, prompt: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
# Wrap the incoming prompt as a user message for each expert and request a response.
initial_message = Message("user", text=prompt)
await ctx.send_message(AgentExecutorRequest(messages=[initial_message], should_respond=True))
@dataclass
class AggregatedInsights:
"""Structured output from the aggregator."""
research: str
marketing: str
legal: str
class AggregateInsights(Executor):
"""Aggregates expert agent responses into a single consolidated result (fan-in)."""
@handler
async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None:
# Map responses to text by executor id for a simple, predictable demo.
by_id: dict[str, str] = {}
for r in results:
# AgentExecutorResponse.agent_response.text contains concatenated assistant text
by_id[r.executor_id] = r.agent_response.text
research_text = by_id.get("researcher", "")
marketing_text = by_id.get("marketer", "")
legal_text = by_id.get("legal", "")
aggregated = AggregatedInsights(
research=research_text,
marketing=marketing_text,
legal=legal_text,
)
# Provide a readable, consolidated string as the final workflow result.
consolidated = (
"Consolidated Insights\n"
"====================\n\n"
f"Research Findings:\n{aggregated.research}\n\n"
f"Marketing Angle:\n{aggregated.marketing}\n\n"
f"Legal/Compliance Notes:\n{aggregated.legal}\n"
)
await ctx.yield_output(consolidated)
async def main() -> None:
"""Build and run the concurrent workflow with visualization."""
# Create agent instances
researcher = AgentExecutor(
Agent(
client=FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
),
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name="researcher",
)
)
marketer = AgentExecutor(
Agent(
client=FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
),
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name="marketer",
)
)
legal = AgentExecutor(
Agent(
client=FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
),
instructions=(
"You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
" based on the prompt."
),
name="legal",
)
)
# Create executor instances
dispatcher = DispatchToExperts(id="dispatcher")
aggregator = AggregateInsights(id="aggregator")
# Build a simple fan-out/fan-in workflow
workflow = (
WorkflowBuilder(start_executor=dispatcher)
.add_fan_out_edges(dispatcher, [researcher, marketer, legal])
.add_fan_in_edges([researcher, marketer, legal], aggregator)
.build()
)
# Generate workflow visualization
print("Generating workflow visualization...")
viz = WorkflowViz(workflow)
# Print out the mermaid string.
print("Mermaid string: \n=======")
print(viz.to_mermaid())
print("=======")
# Print out the DiGraph string with internal executors.
print("DiGraph string: \n=======")
print(viz.to_digraph(include_internal_executors=True))
print("=======")
# Export the DiGraph visualization as SVG.
svg_file = viz.export(format="svg")
print(f"SVG file saved to: {svg_file}")
if __name__ == "__main__":
asyncio.run(main())