[BREAKING] Python: Refactor workflows kwargs (#5010)

* Refactor workflows kwargs usage

* Update sample

* Add tests

* Update samples

* Fix formatting

* Comments

* Comments 2

* Comments 3

* Fix test and typing
This commit is contained in:
Tao Chen
2026-04-02 02:40:39 -07:00
committed by GitHub
Unverified
parent fd253c0b0e
commit 62595b233f
10 changed files with 1092 additions and 826 deletions
@@ -1,148 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import json
import os
from typing import Annotated, Any, cast
from agent_framework import Agent, Message, tool
from agent_framework.foundry import FoundryChatClient
from agent_framework.orchestrations import SequentialBuilder
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
from pydantic import Field
# Load environment variables from .env file
load_dotenv()
"""
Sample: Workflow kwargs Flow to @tool Tools
This sample demonstrates how to flow custom context (skill data, user tokens, etc.)
through any workflow pattern to @tool functions using the **kwargs pattern.
Key Concepts:
- Pass custom context as kwargs when invoking workflow.run()
- kwargs are stored in State and passed to all agent invocations
- @tool functions receive kwargs via **kwargs parameter
- Works with Sequential, Concurrent, GroupChat, Handoff, and Magentic patterns
Prerequisites:
- FOUNDRY_PROJECT_ENDPOINT must be your Azure AI Foundry Agent Service (V2) project endpoint.
- FOUNDRY_MODEL must be set to your Azure OpenAI model deployment name.
"""
# Define tools that accept custom context via **kwargs
# NOTE: approval_mode="never_require" is for sample brevity. Use "always_require" in production;
# see samples/02-agents/tools/function_tool_with_approval.py
# and samples/02-agents/tools/function_tool_with_approval_and_sessions.py.
@tool(approval_mode="never_require")
def get_user_data(
query: Annotated[str, Field(description="What user data to retrieve")],
**kwargs: Any,
) -> str:
"""Retrieve user-specific data based on the authenticated context."""
user_token = kwargs.get("user_token", {})
user_name = user_token.get("user_name", "anonymous")
access_level = user_token.get("access_level", "none")
print(f"\n[get_user_data] Received kwargs keys: {list(kwargs.keys())}")
print(f"[get_user_data] User: {user_name}")
print(f"[get_user_data] Access level: {access_level}")
return f"Retrieved data for user {user_name} with {access_level} access: {query}"
@tool(approval_mode="never_require")
def call_api(
endpoint_name: Annotated[str, Field(description="Name of the API endpoint to call")],
**kwargs: Any,
) -> str:
"""Call an API using the configured endpoints from custom_data."""
custom_data = kwargs.get("custom_data", {})
api_config = custom_data.get("api_config", {})
base_url = api_config.get("base_url", "unknown")
endpoints = api_config.get("endpoints", {})
print(f"\n[call_api] Received kwargs keys: {list(kwargs.keys())}")
print(f"[call_api] Base URL: {base_url}")
print(f"[call_api] Available endpoints: {list(endpoints.keys())}")
if endpoint_name in endpoints:
return f"Called {base_url}{endpoints[endpoint_name]} successfully"
return f"Endpoint '{endpoint_name}' not found in configuration"
async def main() -> None:
print("=" * 70)
print("Workflow kwargs Flow Demo (SequentialBuilder)")
print("=" * 70)
# Create chat client
client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
)
# Create agent with tools that use kwargs
agent = Agent(
client=client,
name="assistant",
instructions=(
"You are a helpful assistant. Use the available tools to help users. "
"When asked about user data, use get_user_data. "
"When asked to call an API, use call_api."
),
tools=[get_user_data, call_api],
)
# Build a simple sequential workflow
workflow = SequentialBuilder(participants=[agent]).build()
# Define custom context that will flow to tools via kwargs
custom_data = {
"api_config": {
"base_url": "https://api.example.com",
"endpoints": {
"users": "/v1/users",
"orders": "/v1/orders",
"products": "/v1/products",
},
},
}
user_token = {
"user_name": "bob@contoso.com",
"access_level": "admin",
}
print("\nCustom Data being passed:")
print(json.dumps(custom_data, indent=2))
print(f"\nUser: {user_token['user_name']}")
print("\n" + "-" * 70)
print("Workflow Execution (watch for [tool_name] logs showing kwargs received):")
print("-" * 70)
# Run workflow with kwargs - these will flow through to tools
async for event in workflow.run(
"Please get my user data and then call the users API endpoint.",
additional_function_arguments={"custom_data": custom_data, "user_token": user_token},
stream=True,
):
if event.type == "output":
output_data = cast(list[Message], event.data)
if isinstance(output_data, list):
for item in output_data:
if isinstance(item, Message) and item.text:
print(f"\n[Final Answer]: {item.text}")
print("\n" + "=" * 70)
print("Sample Complete")
print("=" * 70)
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,170 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import json
import os
from typing import Annotated, Any, cast
from agent_framework import Agent, Message, tool
from agent_framework.foundry import FoundryChatClient
from agent_framework.orchestrations import SequentialBuilder
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
from pydantic import Field
# Load environment variables from .env file
load_dotenv()
"""
Sample: Global Workflow kwargs
This sample demonstrates how to pass the same kwargs to every agent in a
workflow using global targeting. When keys in function_invocation_kwargs do NOT
match any executor ID (agent name), the framework treats them as global and
delivers them to all agents.
Compare with workflow_kwargs_per_agent.py which targets kwargs to specific agents.
Key Concepts:
- Global function_invocation_kwargs are delivered to every agent in the workflow
- Useful when all agents share the same credentials, config, or context
- @tool functions receive kwargs via the **kwargs parameter
Prerequisites:
- FOUNDRY_PROJECT_ENDPOINT must be your Azure AI Foundry Agent Service (V2) project endpoint.
- Environment variables configured
"""
# 1. Define a tool for the research agent — queries a company's internal
# database using credentials passed via global kwargs.
# NOTE: approval_mode="never_require" is for sample brevity. Use "always_require" in production;
# see samples/02-agents/tools/function_tool_with_approval.py
# and samples/02-agents/tools/function_tool_with_approval_and_sessions.py.
@tool(approval_mode="never_require")
def query_company_database(
query: Annotated[
str, Field(description="The database query to run, e.g. 'Q3 revenue' or 'headcount by department'")
],
**kwargs: Any,
) -> str:
"""Query the company's internal database for business metrics and data."""
db_config = kwargs.get("db_config", {})
connection_string = db_config.get("connection_string", "")
database = db_config.get("database", "")
if not connection_string or not database:
return f"ERROR: missing db_config — cannot run query '{query}'"
print(f"\n [query_company_database] Connecting to {database} at {connection_string[:30]}...")
# Simulated company data that the LLM would not know on its own
return (
f"Query results from {database}:\n"
f"- Contoso Q3 2025 revenue: $47.2M (up 12% YoY)\n"
f"- Top product line: CloudSync Pro ($18.6M)\n"
f"- Engineering headcount: 342 (up from 298 in Q2)\n"
f"- Customer churn rate: 4.1% (down from 5.3% in Q2)\n"
f"- Net new enterprise customers: 28"
)
# 2. Define a tool for the writer agent — retrieves the formatting style
# from user preferences passed via global kwargs.
@tool(approval_mode="never_require")
def get_formatting_instructions(
section_title: Annotated[str, Field(description="The title of the section or report to format")],
**kwargs: Any,
) -> str:
"""Get the formatting instructions based on user preferences."""
user_prefs = kwargs.get("user_preferences", {})
output_format = user_prefs.get("format", "plain")
language = user_prefs.get("language", "en")
print(f"\n [get_formatting_instructions] Format: {output_format}, Language: {language}")
return (
f"Formatting rules for '{section_title}':\n"
f"- Output format: {output_format}\n"
f"- Language/locale: {language}\n"
f"- Include a footer: 'Generated in {output_format} for locale {language}'"
)
async def main() -> None:
print("=" * 70)
print("Global Workflow kwargs Demo")
print("=" * 70)
# 3. Create a shared chat client.
client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
)
# 4. Create two agents with different tools and responsibilities.
researcher = Agent(
client=client,
name="researcher",
instructions=(
"You are a data analyst. Call query_company_database exactly once "
"with the user's request as the query. Return the raw results."
),
tools=[query_company_database],
)
writer = Agent(
client=client,
name="writer",
instructions=(
"You are a report writer. Call get_formatting_instructions exactly once, "
"then rewrite the data you receive into a polished report following those rules."
),
tools=[get_formatting_instructions],
)
# 5. Build a sequential workflow: researcher -> writer.
workflow = SequentialBuilder(participants=[researcher, writer]).build()
# 6. Define global kwargs — every agent receives all of these.
# Because the keys ("db_config", "user_preferences") do NOT match any
# executor ID ("researcher", "writer"), the framework treats them as
# global and delivers the full dict to every agent.
global_fi_kwargs = {
"db_config": {
"connection_string": "Server=contoso-sql.database.windows.net;Database=metrics",
"database": "contoso_metrics_prod",
},
"user_preferences": {
"format": "markdown",
"language": "en-US",
},
}
print("\nGlobal function_invocation_kwargs (sent to all agents):")
print(json.dumps(global_fi_kwargs, indent=2))
print("\n" + "-" * 70)
print("Workflow Execution:")
print("-" * 70)
# 7. Run the workflow — every agent receives the same global kwargs.
async for event in workflow.run(
"Pull Contoso's Q3 2025 performance data and write an executive summary.",
function_invocation_kwargs=global_fi_kwargs,
stream=True,
):
if event.type == "output":
output_data = cast(list[Message], event.data)
if isinstance(output_data, list):
for item in output_data:
if isinstance(item, Message) and item.text:
print(f"\n[{item.author_name}]: {item.text}")
print("\n" + "=" * 70)
print("Sample Complete")
print("=" * 70)
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,222 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import json
import os
from typing import Annotated, Any, cast
from agent_framework import Agent, Message, tool
from agent_framework.foundry import FoundryChatClient
from agent_framework.orchestrations import SequentialBuilder
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
from pydantic import Field
# Load environment variables from .env file
load_dotenv()
"""
Sample: Per-Agent Workflow kwargs
This sample demonstrates how to pass different kwargs to different agents in a
workflow using per-agent targeting. When keys in function_invocation_kwargs (or
client_kwargs) match executor IDs (agent names by default), each agent
receives only its own slice of the kwargs.
Key Concepts:
- Per-agent function_invocation_kwargs target specific agents by executor ID
- Agents only receive the kwargs assigned to them (not other agents' kwargs)
- Useful when different agents need different credentials, configs, or context
Prerequisites:
- FOUNDRY_PROJECT_ENDPOINT must be your Azure AI Foundry Agent Service (V2) project endpoint.
- Environment variables configured
"""
# 1. Define a tool for the research agent — queries a company's internal
# database using credentials passed via per-agent kwargs.
# NOTE: approval_mode="never_require" is for sample brevity. Use "always_require" in production;
# see samples/02-agents/tools/function_tool_with_approval.py
# and samples/02-agents/tools/function_tool_with_approval_and_sessions.py.
@tool(approval_mode="never_require")
def query_company_database(
query: Annotated[
str, Field(description="The database query to run, e.g. 'Q3 revenue' or 'headcount by department'")
],
**kwargs: Any,
) -> str:
"""Query the company's internal database for business metrics and data."""
db_config = kwargs.get("db_config", {})
connection_string = db_config.get("connection_string", "")
database = db_config.get("database", "")
if not connection_string or not database:
return f"ERROR: missing db_config — cannot run query '{query}'"
print(f"\n [query_company_database] Connecting to {database} at {connection_string[:30]}...")
# Simulated company data that the LLM would not know on its own
return (
f"Query results from {database}:\n"
f"- Contoso Q3 2025 revenue: $47.2M (up 12% YoY)\n"
f"- Top product line: CloudSync Pro ($18.6M)\n"
f"- Engineering headcount: 342 (up from 298 in Q2)\n"
f"- Customer churn rate: 4.1% (down from 5.3% in Q2)\n"
f"- Net new enterprise customers: 28"
)
# 2. Define a tool for the writer agent — retrieves the formatting style
# from user preferences passed via per-agent kwargs.
@tool(approval_mode="never_require")
def get_formatting_instructions(
section_title: Annotated[str, Field(description="The title of the section or report to format")],
**kwargs: Any,
) -> str:
"""Get the formatting instructions based on user preferences."""
user_prefs = kwargs.get("user_preferences", {})
output_format = user_prefs.get("format", "plain")
language = user_prefs.get("language", "en")
print(f"\n [get_formatting_instructions] Format: {output_format}, Language: {language}")
return (
f"Formatting rules for '{section_title}':\n"
f"- Output format: {output_format}\n"
f"- Language/locale: {language}\n"
f"- Include a footer: 'Generated in {output_format} for locale {language}'"
)
async def main() -> None:
print("=" * 70)
print("Per-Agent Workflow kwargs Demo")
print("=" * 70)
# 3. Create a shared chat client.
client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
)
# 4. Create two agents with different tools and responsibilities.
researcher = Agent(
client=client,
name="researcher",
instructions=(
"You are a data analyst. Call query_company_database exactly once "
"with the user's request as the query. Return the raw results."
),
tools=[query_company_database],
)
writer = Agent(
client=client,
name="writer",
instructions=(
"You are a report writer. Call get_formatting_instructions exactly once, "
"then rewrite the data you receive into a polished report following those rules."
),
tools=[get_formatting_instructions],
)
# 5. Build a sequential workflow: researcher -> writer.
workflow = SequentialBuilder(participants=[researcher, writer]).build()
# 6. Define per-agent kwargs — each agent gets only its own config.
# The keys ("researcher", "writer") match the agent names, which are
# used as executor IDs by default.
per_agent_fi_kwargs = {
"researcher": {
"db_config": {
"connection_string": "Server=contoso-sql.database.windows.net;Database=metrics",
"database": "contoso_metrics_prod",
},
},
"writer": {
"user_preferences": {
"format": "markdown",
"language": "en-US",
},
},
}
print("\nPer-agent function_invocation_kwargs:")
print(json.dumps(per_agent_fi_kwargs, indent=2))
print("\n" + "-" * 70)
print("Workflow Execution:")
print("-" * 70)
# 7. Run the workflow — each agent receives only its targeted kwargs.
async for event in workflow.run(
"Pull Contoso's Q3 2025 performance data and write an executive summary.",
function_invocation_kwargs=per_agent_fi_kwargs,
stream=True,
):
if event.type == "output":
output_data = cast(list[Message], event.data)
if isinstance(output_data, list):
for item in output_data:
if isinstance(item, Message) and item.text:
print(f"\n[{item.author_name}]: {item.text}")
print("\n" + "=" * 70)
print("Sample Complete")
print("=" * 70)
if __name__ == "__main__":
asyncio.run(main())
"""
Sample output:
Per-agent function_invocation_kwargs:
{
"researcher": {
"db_config": {
"connection_string": "Server=contoso-sql.database.windows.net;Database=metrics",
"database": "contoso_metrics_prod"
}
},
"writer": {
"user_preferences": {
"format": "markdown",
"language": "en-US"
}
}
}
----------------------------------------------------------------------
Workflow Execution:
----------------------------------------------------------------------
[query_company_database] Connecting to contoso_metrics_prod at Server=contoso-sql.database.wi...
[researcher]: Here is Contoso's Q3 2025 data:
- Revenue: $47.2M (up 12% YoY)
- Top product: CloudSync Pro ($18.6M)
- Engineering headcount: 342
- Churn rate: 4.1%
- Net new enterprise customers: 28
[get_formatting_instructions] Format: markdown, Language: en-US
[writer]: # Contoso Q3 2025 Executive Summary
| Metric | Value |
|---|---|
| Revenue | $47.2M (+12% YoY) |
| Top Product | CloudSync Pro ($18.6M) |
| Engineering Headcount | 342 |
| Customer Churn | 4.1% |
| New Enterprise Customers | 28 |
Generated in markdown for locale en-US
======================================================================
Sample Complete
======================================================================
"""