Python: semantic-kernel to agent-framework migration code samples (#1045)

* wip migrations

* Wip: workflow migrations

* Add migration samples for sk to af

* Fix typo

* Fixes
This commit is contained in:
Evan Mattson
2025-10-01 16:02:03 +09:00
committed by GitHub
Unverified
parent 498fc06fd6
commit fb51d917fd
23 changed files with 1817 additions and 5 deletions
@@ -0,0 +1,46 @@
# Copyright (c) Microsoft. All rights reserved.
# Semantic Kernel → Microsoft Agent Framework Migration Samples
This gallery helps Semantic Kernel (SK) developers move to the Microsoft Agent Framework (AF) with minimal guesswork. Each script pairs SK code with its AF equivalent so you can compare primitives, tooling, and orchestration patterns side by side while you migrate production workloads.
## Whats Included
- `chat_completion/` SK `ChatCompletionAgent` scenarios and their AF `ChatAgent` counterparts (basic chat, tooling, threading/streaming).
- `azure_ai_agent/` Remote Azure AI agent examples, including hosted code interpreter and explicit thread reuse.
- `openai_assistant/` Assistants API migrations covering basic usage, code interpreter, and custom function tools.
- `openai_responses/` Responses API parity samples with tooling and structured JSON output.
- `copilot_studio/` Copilot Studio agent parity, tools, and streaming examples.
- `orchestrations/` Sequential, Concurrent, and Magentic workflow migrations that mirror SK Team abstractions.
- `processes/` Fan-out/fan-in and nested process examples that contrast SKs Process Framework with AF workflows.
Each script is fully async and the `main()` routine runs both implementations back to back so you can observe their outputs in a single execution.
## Prerequisites
- Python 3.10 or later.
- Access to the necessary model endpoints (Azure OpenAI, OpenAI, Azure AI, Copilot Studio, etc.).
- Installed SDKs: `semantic-kernel` and the Microsoft Agent Framework (`pip install semantic-kernel agent-framework`), or the repos editable packages if you are developing locally.
- Service credentials exposed through environment variables (for example `OPENAI_API_KEY`, `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_KEY`, or Copilot Studio auth settings).
## Running Single-Agent Samples
From the repository root:
```
python samantic-kernel-migration/chat_completion/01_basic_chat_completion.py
```
Every script accepts no CLI arguments and will first call the SK implementation, followed by the AF version. Adjust the prompt or credentials inside the file as necessary before running.
## Running Orchestration & Workflow Samples
Advanced comparisons are split between `samantic-kernel-migration/orchestrations` (Sequential, Concurrent, Magentic) and `samantic-kernel-migration/processes` (fan-out/fan-in, nested). You can run them directly, or isolate dependencies in a throwaway virtual environment:
```
cd samantic-kernel-migration
uv venv --python 3.10 .venv-migration
source .venv-migration/bin/activate
uv pip install semantic-kernel agent-framework
uv run python orchestrations/sequential.py
uv run python processes/fan_out_fan_in_process.py
```
Swap the script path for any other workflow or process sample. Deactivate the sandbox with `deactivate` when you are finished.
## Tips for Migration
- Keep the original SK sample open while iterating on the AF equivalent; the code is intentionally formatted so you can copy/paste across SDKs.
- Threads/conversation state are explicit in AF. When porting SK code that relies on implicit thread reuse, call `agent.get_new_thread()` and pass it into each `run`/`run_stream` call.
- Tools map cleanly: SK `@kernel_function` plugins translate to AF `@ai_function` callables. Hosted tools (code interpreter, web search, MCP) are available only in AF—introduce them once parity is achieved.
- For multi-agent orchestration, AF workflows expose checkpoints and resume capabilities that SK Process/Team abstractions do not. Use the workflow samples as a blueprint when modernizing complex agent graphs.
@@ -0,0 +1,50 @@
# Copyright (c) Microsoft. All rights reserved.
"""Create an Azure AI agent using both Semantic Kernel and Agent Framework.
Prerequisites:
- Azure AI agent resource with a deployed model.
- Logged-in Azure CLI or other credential supported by AzureCliCredential.
"""
import asyncio
async def run_semantic_kernel() -> None:
from azure.identity.aio import AzureCliCredential
from semantic_kernel.agents import AzureAIAgent, AzureAIAgentSettings
async with AzureCliCredential() as credential:
async with AzureAIAgent.create_client(credential=credential) as client:
settings = AzureAIAgentSettings() # Reads env vars for region/deployment.
# SK builds the remote agent definition then wraps it with AzureAIAgent.
definition = await client.agents.create_agent(
model=settings.model_deployment_name,
name="Support",
instructions="Answer customer questions in one paragraph.",
)
agent = AzureAIAgent(client=client, definition=definition)
response = await agent.get_response("How do I upgrade my plan?")
print("[SK]", response.message.content)
async def run_agent_framework() -> None:
from azure.identity.aio import AzureCliCredential
from agent_framework.azure import AzureAIAgentClient
async with AzureCliCredential() as credential:
async with AzureAIAgentClient(async_credential=credential).create_agent(
name="Support",
instructions="Answer customer questions in one paragraph.",
) as agent:
# AF client returns an asynchronous context manager for remote agents.
reply = await agent.run("How do I upgrade my plan?")
print("[AF]", reply.text)
async def main() -> None:
await run_semantic_kernel()
await run_agent_framework()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,57 @@
# Copyright (c) Microsoft. All rights reserved.
"""Enable the hosted code interpreter for Azure AI agents in SK and AF.
The Azure AI service natively executes the code interpreter tool. Provide the
resource details via AzureAIAgentSettings (SK) or environment variables consumed
by AzureAIAgentClient (AF).
"""
import asyncio
async def run_semantic_kernel() -> None:
from azure.identity.aio import AzureCliCredential
from semantic_kernel.agents import AzureAIAgent, AzureAIAgentSettings
async with AzureCliCredential() as credential:
async with AzureAIAgent.create_client(credential=credential) as client:
settings = AzureAIAgentSettings()
# Register the hosted code interpreter tool with the remote agent.
definition = await client.agents.create_agent(
model=settings.model_deployment_name,
name="Analyst",
instructions="Use the code interpreter for numeric work.",
tools=[{"type": "code_interpreter"}],
)
agent = AzureAIAgent(client=client, definition=definition)
response = await agent.get_response(
"Use Python to compute 42 ** 2 and explain the result.",
)
print("[SK]", response.message.content)
async def run_agent_framework() -> None:
from azure.identity.aio import AzureCliCredential
from agent_framework.azure import AzureAIAgentClient, HostedCodeInterpreterTool
async with AzureCliCredential() as credential:
async with AzureAIAgentClient(async_credential=credential).create_agent(
name="Analyst",
instructions="Use the code interpreter for numeric work.",
tools=[HostedCodeInterpreterTool()],
) as agent:
# HostedCodeInterpreterTool mirrors the built-in Azure AI capability.
reply = await agent.run(
"Use Python to compute 42 ** 2 and explain the result.",
tool_choice="auto",
)
print("[AF]", reply.text)
async def main() -> None:
await run_semantic_kernel()
await run_agent_framework()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,66 @@
# Copyright (c) Microsoft. All rights reserved.
"""Maintain Azure AI agent conversation state across turns in SK and AF."""
import asyncio
async def run_semantic_kernel() -> None:
from azure.identity.aio import AzureCliCredential
from semantic_kernel.agents import AzureAIAgent, AzureAIAgentSettings, AzureAIAgentThread
async with AzureCliCredential() as credential:
async with AzureAIAgent.create_client(credential=credential) as client:
settings = AzureAIAgentSettings()
definition = await client.agents.create_agent(
model=settings.model_deployment_name,
name="Planner",
instructions="Track follow-up questions within the same thread.",
)
agent = AzureAIAgent(client=client, definition=definition)
thread: AzureAIAgentThread | None = None
# SK returns the updated AzureAIAgentThread on each response.
first = await agent.get_response("Outline the onboarding checklist.", thread=thread)
thread = first.thread
print("[SK][turn1]", first.message.content)
second = await agent.get_response(
"Highlight the items that require legal review.",
thread=thread,
)
print("[SK][turn2]", second.message.content)
if thread is not None:
print("[SK][thread-id]", thread.id)
async def run_agent_framework() -> None:
from azure.identity.aio import AzureCliCredential
from agent_framework.azure import AzureAIAgentClient
async with AzureCliCredential() as credential:
async with AzureAIAgentClient(async_credential=credential).create_agent(
name="Planner",
instructions="Track follow-up questions within the same thread.",
) as agent:
thread = agent.get_new_thread()
# AF threads are explicit and can be serialized for external storage.
first = await agent.run("Outline the onboarding checklist.", thread=thread)
print("[AF][turn1]", first.text)
second = await agent.run(
"Highlight the items that require legal review.",
thread=thread,
)
print("[AF][turn2]", second.text)
serialized = await thread.serialize()
print("[AF][thread-json]", serialized)
async def main() -> None:
await run_semantic_kernel()
await run_agent_framework()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,46 @@
# Copyright (c) Microsoft. All rights reserved.
"""Basic SK ChatCompletionAgent vs Agent Framework ChatAgent.
Both samples expect OpenAI-compatible environment variables (OPENAI_API_KEY or
Azure OpenAI configuration). Update the prompts or client wiring to match your
model of choice before running.
"""
import asyncio
async def run_semantic_kernel() -> None:
"""Call SK's ChatCompletionAgent for a simple question."""
from semantic_kernel.agents import ChatCompletionAgent
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion
# SK agent holds the thread state internally via ChatCompletionAgent.
agent = ChatCompletionAgent(
service=OpenAIChatCompletion(),
name="Support",
instructions="Answer in one sentence.",
)
response = await agent.get_response(messages="How do I reset my bike tire?")
print("[SK]", response.message.content)
async def run_agent_framework() -> None:
"""Call Agent Framework's ChatAgent created from OpenAIChatClient."""
from agent_framework.openai import OpenAIChatClient
# AF constructs a lightweight ChatAgent backed by OpenAIChatClient.
chat_agent = OpenAIChatClient().create_agent(
name="Support",
instructions="Answer in one sentence.",
)
reply = await chat_agent.run("How do I reset my bike tire?")
print("[AF]", reply.text)
async def main() -> None:
await run_semantic_kernel()
await run_agent_framework()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,65 @@
# Copyright (c) Microsoft. All rights reserved.
"""Demonstrate SK plugins vs Agent Framework tools with a chat agent.
Configure your OpenAI or Azure OpenAI credentials before running. The example
exposes a "specials" tool that both SDKs call during the conversation.
"""
import asyncio
async def run_semantic_kernel() -> None:
from semantic_kernel.agents import ChatCompletionAgent, ChatHistoryAgentThread
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion
from semantic_kernel.functions import kernel_function
class SpecialsPlugin:
@kernel_function(name="specials", description="List daily specials")
def specials(self) -> str:
return "Clam chowder, Cobb salad, Chai tea"
# SK advertises tools by attaching plugin instances at construction time.
agent = ChatCompletionAgent(
service=OpenAIChatCompletion(),
name="Host",
instructions="Answer menu questions accurately.",
plugins=[SpecialsPlugin()],
)
thread = ChatHistoryAgentThread()
response = await agent.get_response(
messages="What soup can I order today?",
thread=thread,
)
print("[SK]", response.message.content)
async def run_agent_framework() -> None:
from agent_framework._tools import ai_function
from agent_framework.openai import OpenAIChatClient
@ai_function(name="specials", description="List daily specials")
async def specials() -> str:
return "Clam chowder, Cobb salad, Chai tea"
# AF tools are provided as callables on each agent instance.
chat_agent = OpenAIChatClient().create_agent(
name="Host",
instructions="Answer menu questions accurately.",
tools=[specials],
)
thread = chat_agent.get_new_thread()
reply = await chat_agent.run(
"What soup can I order today?",
thread=thread,
tool_choice="auto",
)
print("[AF]", reply.text)
async def main() -> None:
await run_semantic_kernel()
await run_agent_framework()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,71 @@
# Copyright (c) Microsoft. All rights reserved.
"""Compare conversation threading and streaming responses for chat agents.
Both implementations reuse a conversation thread across turns and stream output
for the second turn.
"""
import asyncio
async def run_semantic_kernel() -> None:
from semantic_kernel.agents import ChatCompletionAgent, ChatHistoryAgentThread
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion
# SK thread object keeps the conversation history on the agent side.
agent = ChatCompletionAgent(
service=OpenAIChatCompletion(),
name="Writer",
instructions="Keep answers short and friendly.",
)
thread = ChatHistoryAgentThread()
first = await agent.get_response(
messages="Suggest a catchy headline for our product launch.",
thread=thread,
)
print("[SK]", first.message.content)
print("[SK][stream]", end=" ")
async for update in agent.invoke_stream(
messages="Draft a 2 sentence blurb.",
thread=thread,
):
if update.message:
print(update.message.content, end="", flush=True)
print()
async def run_agent_framework() -> None:
from agent_framework.openai import OpenAIChatClient
# AF thread objects are requested explicitly from the agent.
chat_agent = OpenAIChatClient().create_agent(
name="Writer",
instructions="Keep answers short and friendly.",
)
thread = chat_agent.get_new_thread()
first = await chat_agent.run(
"Suggest a catchy headline for our product launch.",
thread=thread,
)
print("[AF]", first.text)
print("[AF][stream]", end=" ")
async for chunk in chat_agent.run_stream(
"Draft a 2 sentence blurb.",
thread=thread,
):
if chunk.text:
print(chunk.text, end="", flush=True)
print()
async def main() -> None:
await run_semantic_kernel()
await run_agent_framework()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,37 @@
# Copyright (c) Microsoft. All rights reserved.
"""Call a Copilot Studio agent with SK and Agent Framework."""
import asyncio
async def run_semantic_kernel() -> None:
from semantic_kernel.agents import CopilotStudioAgent
# SK agent talks to the configured Copilot Studio bot directly.
agent = CopilotStudioAgent(
name="PhysicsAgent",
instructions="Answer physics questions concisely.",
)
response = await agent.get_response("Why is the sky blue?")
print("[SK]", response.message.content)
async def run_agent_framework() -> None:
from agent_framework.microsoft import CopilotStudioAgent
# AF exposes an equivalent CopilotStudioAgent wrapper.
agent = CopilotStudioAgent(
name="PhysicsAgent",
instructions="Answer physics questions concisely.",
)
reply = await agent.run("Why is the sky blue?")
print("[AF]", reply.text)
async def main() -> None:
await run_semantic_kernel()
await run_agent_framework()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,43 @@
# Copyright (c) Microsoft. All rights reserved.
"""Stream responses from Copilot Studio agents in SK and AF."""
import asyncio
async def run_semantic_kernel() -> None:
from semantic_kernel.agents import CopilotStudioAgent
agent = CopilotStudioAgent(
name="TourGuide",
instructions="Provide travel recommendations in short bursts.",
)
# SK streaming yields chunks with message metadata.
print("[SK][stream]", end=" ")
async for chunk in agent.invoke_stream("Plan a day in Copenhagen for foodies."):
if chunk.message:
print(chunk.message.content, end="", flush=True)
print()
async def run_agent_framework() -> None:
from agent_framework.microsoft import CopilotStudioAgent
agent = CopilotStudioAgent(
name="TourGuide",
instructions="Provide travel recommendations in short bursts.",
)
# AF streaming provides incremental AgentRunResponseUpdate objects.
print("[AF][stream]", end=" ")
async for update in agent.run_stream("Plan a day in Copenhagen for foodies."):
if update.text:
print(update.text, end="", flush=True)
print()
async def main() -> None:
await run_semantic_kernel()
await run_agent_framework()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,55 @@
# Copyright (c) Microsoft. All rights reserved.
"""Create an OpenAI Assistant using SK and Agent Framework."""
import asyncio
import os
ASSISTANT_MODEL = os.environ.get("OPENAI_ASSISTANT_MODEL", "gpt-4o-mini")
async def run_semantic_kernel() -> None:
from semantic_kernel.agents import AssistantAgentThread, OpenAIAssistantAgent
client = OpenAIAssistantAgent.create_client()
# Provision the assistant on the OpenAI Assistants service.
definition = await client.beta.assistants.create(
model=ASSISTANT_MODEL,
name="Helper",
instructions="Answer questions in one concise paragraph.",
)
agent = OpenAIAssistantAgent(client=client, definition=definition)
thread: AssistantAgentThread | None = None
response = await agent.get_response("What is the capital of Denmark?", thread=thread)
thread = response.thread
print("[SK]", response.message.content)
if thread is not None:
print("[SK][thread-id]", thread.id)
async def run_agent_framework() -> None:
from agent_framework.openai import OpenAIAssistantsClient
assistants_client = OpenAIAssistantsClient()
# AF wraps the assistant lifecycle with an async context manager.
async with assistants_client.create_agent(
name="Helper",
instructions="Answer questions in one concise paragraph.",
model=ASSISTANT_MODEL,
) as assistant_agent:
reply = await assistant_agent.run("What is the capital of Denmark?")
print("[AF]", reply.text)
follow_up = await assistant_agent.run(
"How many residents live there?",
thread=assistant_agent.get_new_thread(),
)
print("[AF][follow-up]", follow_up.text)
async def main() -> None:
await run_semantic_kernel()
await run_agent_framework()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,55 @@
# Copyright (c) Microsoft. All rights reserved.
"""Enable the code interpreter tool for OpenAI Assistants in SK and AF."""
import asyncio
async def run_semantic_kernel() -> None:
from semantic_kernel.agents import OpenAIAssistantAgent
from semantic_kernel.connectors.ai.open_ai import OpenAISettings
client = OpenAIAssistantAgent.create_client()
code_interpreter_tool, code_interpreter_tool_resources = OpenAIAssistantAgent.configure_code_interpreter_tool()
# Enable the hosted code interpreter tool on the assistant definition.
definition = await client.beta.assistants.create(
model=OpenAISettings().chat_deployment_name,
name="CodeRunner",
instructions="Run the provided request as code and return the result.",
tools=code_interpreter_tool,
tool_resources=code_interpreter_tool_resources,
)
agent = OpenAIAssistantAgent(client=client, definition=definition)
response = await agent.get_response(
"Use Python to calculate the mean of [41, 42, 45] and explain the steps.",
)
print(f"[SK]: {response}")
async def run_agent_framework() -> None:
from agent_framework import HostedCodeInterpreterTool
from agent_framework.openai import OpenAIAssistantsClient
assistants_client = OpenAIAssistantsClient()
# AF exposes the same tool configuration via create_agent.
async with assistants_client.create_agent(
name="CodeRunner",
instructions="Use the code interpreter when calculations are required.",
model="gpt-4.1",
tools=[HostedCodeInterpreterTool()],
) as assistant_agent:
response = await assistant_agent.run(
"Use Python to calculate the mean of [41, 42, 45] and explain the steps.",
tool_choice="auto",
)
print(f"[AF]: {response.text}")
async def main() -> None:
await run_semantic_kernel()
await run_agent_framework()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,89 @@
# Copyright (c) Microsoft. All rights reserved.
"""Implement a function tool for OpenAI Assistants in SK and AF."""
import asyncio
import os
from typing import Any
ASSISTANT_MODEL = os.environ.get("OPENAI_ASSISTANT_MODEL", "gpt-4o-mini")
async def fake_weather_lookup(city: str, day: str) -> dict[str, Any]:
"""Pretend to call a weather service."""
return {
"city": city,
"day": day,
"forecast": "Sunny with scattered clouds",
"high_c": 22,
"low_c": 14,
}
async def run_semantic_kernel() -> None:
from semantic_kernel.agents import AssistantAgentThread, OpenAIAssistantAgent
from semantic_kernel.functions import kernel_function
class WeatherPlugin:
@kernel_function(name="get_forecast", description="Look up the forecast for a city and day.")
async def fake_weather_lookup(city: str, day: str) -> dict[str, Any]:
"""Pretend to call a weather service."""
return {
"city": city,
"day": day,
"forecast": "Sunny with scattered clouds",
"high_c": 22,
"low_c": 14,
}
client = OpenAIAssistantAgent.create_client()
# Tool schema is registered on the assistant definition.
definition = await client.beta.assistants.create(
model=ASSISTANT_MODEL,
name="WeatherHelper",
instructions="Call get_forecast to fetch weather details.",
plugins=[WeatherPlugin()],
)
agent = OpenAIAssistantAgent(client=client, definition=definition)
thread: AssistantAgentThread | None = None
response = await agent.get_response(
"What will the weather be like in Seattle tomorrow?",
thread=thread,
)
thread = response.thread
print("[SK][initial]", response.message.content)
async def run_agent_framework() -> None:
from agent_framework._tools import ai_function
from agent_framework.openai import OpenAIAssistantsClient
@ai_function(
name="get_forecast",
description="Look up the forecast for a city and day.",
)
async def get_forecast(city: str, day: str) -> dict[str, Any]:
return await fake_weather_lookup(city, day)
assistants_client = OpenAIAssistantsClient()
# AF converts the decorated function into an assistant-compatible tool.
async with assistants_client.create_agent(
name="WeatherHelper",
instructions="Call get_forecast to fetch weather details.",
model=ASSISTANT_MODEL,
tools=[get_forecast],
) as assistant_agent:
reply = await assistant_agent.run(
"What will the weather be like in Seattle tomorrow?",
tool_choice="auto",
)
print("[AF]", reply.text)
async def main() -> None:
await run_semantic_kernel()
await run_agent_framework()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,48 @@
# Copyright (c) Microsoft. All rights reserved.
"""Issue a basic Responses API call using SK and Agent Framework."""
import asyncio
async def run_semantic_kernel() -> None:
from azure.identity import AzureCliCredential
from semantic_kernel.agents import AzureResponsesAgent
from semantic_kernel.connectors.ai.open_ai import AzureOpenAISettings
credential = AzureCliCredential()
try:
client = AzureResponsesAgent.create_client(credential=credential)
# SK response agents wrap Azure OpenAI's hosted Responses API.
agent = AzureResponsesAgent(
ai_model_id=AzureOpenAISettings().responses_deployment_name,
client=client,
instructions="Answer in one concise sentence.",
name="Expert",
)
response = await agent.get_response("Why is the sky blue?")
print("[SK]", response.message.content)
finally:
await credential.close()
async def run_agent_framework() -> None:
from agent_framework import ChatAgent
from agent_framework.openai import OpenAIResponsesClient
# AF ChatAgent can swap in an OpenAIResponsesClient directly.
chat_agent = ChatAgent(
chat_client=OpenAIResponsesClient(),
instructions="Answer in one concise sentence.",
name="Expert",
)
reply = await chat_agent.run("Why is the sky blue?")
print("[AF]", reply.text)
async def main() -> None:
await run_semantic_kernel()
await run_agent_framework()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,61 @@
# Copyright (c) Microsoft. All rights reserved.
"""Attach a lightweight function tool to the Responses API in SK and AF."""
import asyncio
async def run_semantic_kernel() -> None:
from azure.identity import AzureCliCredential
from semantic_kernel.agents import AzureResponsesAgent
from semantic_kernel.connectors.ai.open_ai import AzureOpenAISettings
from semantic_kernel.functions import kernel_function
class MathPlugin:
@kernel_function(name="add", description="Add two numbers")
def add(self, a: float, b: float) -> float:
return a + b
credential = AzureCliCredential()
try:
client = AzureResponsesAgent.create_client(credential=credential)
# Plugins advertise callable tools to the Responses agent.
agent = AzureResponsesAgent(
ai_model_id=AzureOpenAISettings().responses_deployment_name,
client=client,
instructions="Use the add tool when math is required.",
name="MathExpert",
plugins=[MathPlugin()],
)
response = await agent.get_response("Use add(41, 1) and explain the result.")
print("[SK]", response.message.content)
finally:
await credential.close()
async def run_agent_framework() -> None:
from agent_framework import ChatAgent
from agent_framework._tools import ai_function
from agent_framework.openai import OpenAIResponsesClient
@ai_function(name="add", description="Add two numbers")
async def add(a: float, b: float) -> float:
return a + b
chat_agent = ChatAgent(
chat_client=OpenAIResponsesClient(),
instructions="Use the add tool when math is required.",
name="MathExpert",
# AF registers the async function as a tool at construction.
tools=[add],
)
reply = await chat_agent.run("Use add(41, 1) and explain the result.")
print("[AF]", reply.text)
async def main() -> None:
await run_semantic_kernel()
await run_agent_framework()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,63 @@
# Copyright (c) Microsoft. All rights reserved.
"""Request structured JSON output from the Responses API in SK and AF."""
import asyncio
from pydantic import BaseModel
class ReleaseBrief(BaseModel):
feature: str
benefit: str
launch_date: str
async def run_semantic_kernel() -> None:
from azure.identity import AzureCliCredential
from semantic_kernel.agents import AzureResponsesAgent
from semantic_kernel.connectors.ai.open_ai import AzureOpenAISettings
credential = AzureCliCredential()
try:
client = AzureResponsesAgent.create_client(credential=credential)
# response_format requests schema-constrained output from the model.
agent = AzureResponsesAgent(
ai_model_id=AzureOpenAISettings().responses_deployment_name,
client=client,
instructions="Return launch briefs as structured JSON.",
name="ProductMarketer",
text=AzureResponsesAgent.configure_response_format(ReleaseBrief),
)
response = await agent.get_response(
"Draft a launch brief for the Contoso Note app.",
response_format=ReleaseBrief,
)
print("[SK]", response.message.content)
finally:
await credential.close()
async def run_agent_framework() -> None:
from agent_framework import ChatAgent
from agent_framework.openai import OpenAIResponsesClient
chat_agent = ChatAgent(
chat_client=OpenAIResponsesClient(),
instructions="Return launch briefs as structured JSON.",
name="ProductMarketer",
)
# AF forwards the same response_format payload at invocation time.
reply = await chat_agent.run(
"Draft a launch brief for the Contoso Note app.",
response_format=ReleaseBrief,
)
print("[AF]", reply.text)
async def main() -> None:
await run_semantic_kernel()
await run_agent_framework()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,123 @@
# Copyright (c) Microsoft. All rights reserved.
"""Side-by-side concurrent orchestrations for Agent Framework and Semantic Kernel."""
import asyncio
from collections.abc import Sequence
from typing import cast
from agent_framework import ChatMessage, ConcurrentBuilder, WorkflowOutputEvent
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
from semantic_kernel.agents import Agent, ChatCompletionAgent, ConcurrentOrchestration
from semantic_kernel.agents.runtime import InProcessRuntime
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.contents import ChatMessageContent
PROMPT = "Explain the concept of temperature from multiple scientific perspectives."
######################################################################
# Semantic Kernel orchestration path
######################################################################
def build_semantic_kernel_agents() -> list[Agent]:
credential = AzureCliCredential()
physics_agent = ChatCompletionAgent(
name="PhysicsExpert",
instructions=("You are an expert in physics. Answer questions from a physics perspective."),
service=AzureChatCompletion(credential=credential),
)
chemistry_agent = ChatCompletionAgent(
name="ChemistryExpert",
instructions=("You are an expert in chemistry. Answer questions from a chemistry perspective."),
service=AzureChatCompletion(credential=credential),
)
return [physics_agent, chemistry_agent]
async def run_semantic_kernel_example(prompt: str) -> Sequence[ChatMessageContent]:
concurrent_orchestration = ConcurrentOrchestration(members=build_semantic_kernel_agents())
runtime = InProcessRuntime()
runtime.start()
try:
orchestration_result = await concurrent_orchestration.invoke(task=prompt, runtime=runtime)
final_value = await orchestration_result.get(timeout=60)
if isinstance(final_value, ChatMessageContent):
return [final_value]
if isinstance(final_value, Sequence):
return list(final_value)
return []
finally:
await runtime.stop_when_idle()
def _print_semantic_kernel_outputs(outputs: Sequence[ChatMessageContent]) -> None:
if not outputs:
print("No Semantic Kernel output.")
return
print("===== Semantic Kernel Concurrent =====")
for item in outputs:
content = item.content or ""
print(f"# {item.name}\n{content}\n")
######################################################################
# Agent Framework orchestration path
######################################################################
async def run_agent_framework_example(prompt: str) -> Sequence[list[ChatMessage]]:
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
physics = chat_client.create_agent(
instructions=("You are an expert in physics. Answer questions from a physics perspective."),
name="physics",
)
chemistry = chat_client.create_agent(
instructions=("You are an expert in chemistry. Answer questions from a chemistry perspective."),
name="chemistry",
)
workflow = ConcurrentBuilder().participants([physics, chemistry]).build()
outputs: list[list[ChatMessage]] = []
async for event in workflow.run_stream(prompt):
if isinstance(event, WorkflowOutputEvent):
outputs.append(cast(list[ChatMessage], event.data))
return outputs
def _print_agent_framework_outputs(conversations: Sequence[Sequence[ChatMessage]]) -> None:
if not conversations:
print("No Agent Framework output.")
return
print("===== Agent Framework Concurrent =====")
for index, conversation in enumerate(conversations, start=1):
print(f"--- Conversation {index} ---")
for message in conversation:
name = message.author_name or "assistant"
print(f"[{name}] {message.text}")
print()
async def main() -> None:
agent_framework_outputs = await run_agent_framework_example(PROMPT)
_print_agent_framework_outputs(agent_framework_outputs)
semantic_kernel_outputs = await run_semantic_kernel_example(PROMPT)
_print_semantic_kernel_outputs(semantic_kernel_outputs)
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,172 @@
# Copyright (c) Microsoft. All rights reserved.
"""Side-by-side Magentic orchestrations for Agent Framework and Semantic Kernel."""
import asyncio
from collections.abc import Sequence
from typing import cast
from agent_framework import ChatAgent, HostedCodeInterpreterTool, MagenticBuilder, WorkflowOutputEvent
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient
from semantic_kernel.agents import (
Agent,
ChatCompletionAgent,
MagenticOrchestration,
OpenAIAssistantAgent,
StandardMagenticManager,
)
from semantic_kernel.agents.runtime import InProcessRuntime
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion, OpenAISettings
from semantic_kernel.contents import ChatMessageContent
PROMPT = (
"I am preparing a report on the energy efficiency of different machine learning model architectures. "
"Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
"on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
"Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 VM "
"for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model per task type "
"(image classification, text classification, and text generation)."
)
######################################################################
# Semantic Kernel orchestration path
######################################################################
async def build_semantic_kernel_agents() -> list[Agent]:
research_agent = ChatCompletionAgent(
name="ResearchAgent",
description="A helpful assistant with access to web search. Ask it to perform web searches.",
instructions=(
"You are a Researcher. You find information without additional computation or quantitative analysis."
),
service=OpenAIChatCompletion(ai_model_id="gpt-4o-search-preview"),
)
client = OpenAIAssistantAgent.create_client()
code_interpreter_tool, code_interpreter_tool_resources = OpenAIAssistantAgent.configure_code_interpreter_tool()
openai_settings = OpenAISettings()
model_id = openai_settings.chat_model_id if openai_settings.chat_model_id else "gpt-5"
definition = await client.beta.assistants.create(
model=model_id,
name="CoderAgent",
description="A helpful assistant that writes and executes code to process and analyze data.",
instructions="You solve questions using code. Please provide detailed analysis and computation process.",
tools=code_interpreter_tool,
tool_resources=code_interpreter_tool_resources,
)
coder_agent = OpenAIAssistantAgent(
client=client,
definition=definition,
)
return [research_agent, coder_agent]
def sk_agent_response_callback(
message: ChatMessageContent | Sequence[ChatMessageContent],
) -> None:
if isinstance(message, ChatMessageContent):
messages: Sequence[ChatMessageContent] = [message]
elif isinstance(message, Sequence) and not isinstance(message, (str, bytes)):
messages = [item for item in message if isinstance(item, ChatMessageContent)]
else:
messages = []
for item in messages:
content = item.content or ""
print(f"**{item.name}**\n{content}\n")
async def run_semantic_kernel_example(prompt: str) -> Sequence[ChatMessageContent]:
agents = await build_semantic_kernel_agents()
magentic_orchestration = MagenticOrchestration(
members=agents,
manager=StandardMagenticManager(chat_completion_service=OpenAIChatCompletion()),
agent_response_callback=sk_agent_response_callback,
)
runtime = InProcessRuntime()
runtime.start()
try:
orchestration_result = await magentic_orchestration.invoke(task=prompt, runtime=runtime)
value = await orchestration_result.get()
if isinstance(value, ChatMessageContent):
return [value]
if isinstance(value, Sequence) and not isinstance(value, (str, bytes)):
return [item for item in value if isinstance(item, ChatMessageContent)]
return []
finally:
await runtime.stop_when_idle()
def _print_semantic_kernel_outputs(outputs: Sequence[ChatMessageContent]) -> None:
if not outputs:
print("No Semantic Kernel output.")
return
print("===== Semantic Kernel Magentic =====")
for item in outputs:
content = item.content or ""
print(f"**{item.name}**\n{content}\n")
######################################################################
# Agent Framework orchestration path
######################################################################
async def run_agent_framework_example(prompt: str) -> str | None:
researcher = ChatAgent(
name="ResearcherAgent",
description="Specialist in research and information gathering",
instructions=(
"You are a Researcher. You find information without additional computation or quantitative analysis."
),
chat_client=OpenAIChatClient(ai_model_id="gpt-4o-search-preview"),
)
coder = ChatAgent(
name="CoderAgent",
description="A helpful assistant that writes and executes code to process and analyze data.",
instructions="You solve questions using code. Please provide detailed analysis and computation process.",
chat_client=OpenAIResponsesClient(),
tools=HostedCodeInterpreterTool(),
)
workflow = (
MagenticBuilder()
.participants(researcher=researcher, coder=coder)
.with_standard_manager(chat_client=OpenAIChatClient())
.build()
)
final_text: str | None = None
async for event in workflow.run_stream(prompt):
if isinstance(event, WorkflowOutputEvent):
final_text = cast(str, event.data)
return final_text
def _print_agent_framework_output(result: str | None) -> None:
if result is None:
print("No Agent Framework output.")
return
print("===== Agent Framework Magentic =====")
print(result)
async def main() -> None:
agent_framework_result = await run_agent_framework_example(PROMPT)
_print_agent_framework_output(agent_framework_result)
semantic_kernel_outputs = await run_semantic_kernel_example(PROMPT)
_print_semantic_kernel_outputs(semantic_kernel_outputs)
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,127 @@
# Copyright (c) Microsoft. All rights reserved.
"""Side-by-side sequential orchestrations for Agent Framework and Semantic Kernel."""
import asyncio
from collections.abc import Sequence
from typing import cast
from agent_framework import ChatMessage, Role, SequentialBuilder, WorkflowOutputEvent
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
from semantic_kernel.agents import Agent, ChatCompletionAgent, SequentialOrchestration
from semantic_kernel.agents.runtime import InProcessRuntime
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.contents import ChatMessageContent
PROMPT = "Write a tagline for a budget-friendly eBike."
######################################################################
# Semantic Kernel orchestration path
######################################################################
def build_semantic_kernel_agents() -> list[Agent]:
credential = AzureCliCredential()
writer_agent = ChatCompletionAgent(
name="WriterAgent",
instructions=("You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."),
service=AzureChatCompletion(credential=credential),
)
reviewer_agent = ChatCompletionAgent(
name="ReviewerAgent",
instructions=("You are a thoughtful reviewer. Give brief feedback on the previous assistant message."),
service=AzureChatCompletion(credential=credential),
)
return [writer_agent, reviewer_agent]
async def sk_agent_response_callback(
message: ChatMessageContent | Sequence[ChatMessageContent],
) -> None:
if isinstance(message, ChatMessageContent):
messages: Sequence[ChatMessageContent] = [message]
elif isinstance(message, Sequence) and not isinstance(message, (str, bytes)):
messages = list(message)
else:
messages = [cast(ChatMessageContent, message)]
for item in messages:
content = item.content or ""
print(f"# {item.name}\n{content}\n")
######################################################################
# Agent Framework orchestration path
######################################################################
async def run_agent_framework_example(prompt: str) -> list[ChatMessage]:
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
writer = chat_client.create_agent(
instructions=("You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."),
name="writer",
)
reviewer = chat_client.create_agent(
instructions=("You are a thoughtful reviewer. Give brief feedback on the previous assistant message."),
name="reviewer",
)
workflow = SequentialBuilder().participants([writer, reviewer]).build()
conversation_outputs: list[list[ChatMessage]] = []
async for event in workflow.run_stream(prompt):
if isinstance(event, WorkflowOutputEvent):
conversation_outputs.append(cast(list[ChatMessage], event.data))
return conversation_outputs[-1] if conversation_outputs else []
async def run_semantic_kernel_example(prompt: str) -> str:
sequential_orchestration = SequentialOrchestration(
members=build_semantic_kernel_agents(),
agent_response_callback=sk_agent_response_callback,
)
runtime = InProcessRuntime()
runtime.start()
try:
orchestration_result = await sequential_orchestration.invoke(task=prompt, runtime=runtime)
final_message = await orchestration_result.get(timeout=20)
if isinstance(final_message, ChatMessageContent):
return final_message.content or ""
return str(final_message)
finally:
await runtime.stop_when_idle()
def _format_conversation(conversation: list[ChatMessage]) -> None:
if not conversation:
print("No Agent Framework output.")
return
print("===== Agent Framework Sequential =====")
for index, message in enumerate(conversation, start=1):
name = message.author_name or ("assistant" if message.role == Role.ASSISTANT else "user")
print(f"{'-' * 60}\n{index:02d} [{name}]\n{message.text}")
print()
async def main() -> None:
conversation = await run_agent_framework_example(PROMPT)
_format_conversation(conversation)
print("===== Semantic Kernel Sequential =====")
final_text = await run_semantic_kernel_example(PROMPT)
print(final_text)
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,252 @@
# Copyright (c) Microsoft. All rights reserved.
"""Side-by-side sample comparing Semantic Kernel Process Framework and Agent Framework workflows."""
import asyncio
import logging
from dataclasses import dataclass
from enum import Enum
from typing import TYPE_CHECKING, ClassVar, cast
######################################################################
# region Agent Framework imports
######################################################################
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from pydantic import BaseModel, Field
######################################################################
# region Semantic Kernel imports
######################################################################
from semantic_kernel import Kernel
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion
from semantic_kernel.functions import kernel_function
from semantic_kernel.processes.kernel_process.kernel_process_event import KernelProcessEvent
from semantic_kernel.processes.kernel_process.kernel_process_step import KernelProcessStep
from semantic_kernel.processes.kernel_process.kernel_process_step_context import KernelProcessStepContext
from semantic_kernel.processes.kernel_process.kernel_process_step_state import KernelProcessStepState
from semantic_kernel.processes.process_builder import ProcessBuilder
if TYPE_CHECKING:
from semantic_kernel.processes.kernel_process import KernelProcess
from semantic_kernel.processes.local_runtime.local_kernel_process import LocalKernelProcessContext
async def _start_local_kernel_process(
*,
process: "KernelProcess",
kernel: Kernel,
initial_event: KernelProcessEvent | str | Enum,
**kwargs: object,
) -> "LocalKernelProcessContext":
from semantic_kernel.processes.local_runtime.local_kernel_process import start as start_local_kernel_process
return await start_local_kernel_process(
process=process,
kernel=kernel,
initial_event=initial_event,
**kwargs,
)
logging.basicConfig(level=logging.WARNING)
class CommonEvents(Enum):
"""Common events for both samples."""
USER_INPUT_RECEIVED = "UserInputReceived"
COMPLETION_RESPONSE_GENERATED = "CompletionResponseGenerated"
WELCOME_DONE = "WelcomeDone"
A_STEP_DONE = "AStepDone"
B_STEP_DONE = "BStepDone"
C_STEP_DONE = "CStepDone"
START_A_REQUESTED = "StartARequested"
START_B_REQUESTED = "StartBRequested"
EXIT_REQUESTED = "ExitRequested"
START_PROCESS = "StartProcess"
######################################################################
# region Semantic Kernel Process Framework path
######################################################################
class KickOffStep(KernelProcessStep[None]):
KICK_OFF_FUNCTION: ClassVar[str] = "kick_off"
@kernel_function(name=KICK_OFF_FUNCTION)
async def print_welcome_message(self, context: KernelProcessStepContext):
await context.emit_event(process_event=CommonEvents.START_A_REQUESTED, data="Get Going A")
await context.emit_event(process_event=CommonEvents.START_B_REQUESTED, data="Get Going B")
class AStep(KernelProcessStep[None]):
@kernel_function()
async def do_it(self, context: KernelProcessStepContext):
await asyncio.sleep(1)
await context.emit_event(process_event=CommonEvents.A_STEP_DONE.value, data="I did A")
class BStep(KernelProcessStep[None]):
@kernel_function()
async def do_it(self, context: KernelProcessStepContext):
await asyncio.sleep(2)
await context.emit_event(process_event=CommonEvents.B_STEP_DONE.value, data="I did B")
class CStepState(BaseModel):
current_cycle: int = 0
class CStep(KernelProcessStep[CStepState]):
state: CStepState = Field(default_factory=CStepState)
async def activate(self, state: KernelProcessStepState[CStepState]):
self.state = state.state
@kernel_function()
async def do_it(self, context: KernelProcessStepContext, astepdata: str, bstepdata: str):
self.state.current_cycle += 1
print(f"CStep Current Cycle: {self.state.current_cycle}")
if self.state.current_cycle == 3:
print("CStep Exit Requested")
await context.emit_event(process_event=CommonEvents.EXIT_REQUESTED.value)
return
await context.emit_event(process_event=CommonEvents.C_STEP_DONE.value)
kernel = Kernel()
async def run_semantic_kernel_process_example() -> None:
kernel.add_service(OpenAIChatCompletion(service_id="default"))
process = ProcessBuilder(name="Process Framework Sample")
kickoff_step = process.add_step(step_type=KickOffStep)
step_a = process.add_step(step_type=AStep)
step_b = process.add_step(step_type=BStep)
step_c = process.add_step(step_type=CStep)
process.on_input_event(event_id=CommonEvents.START_PROCESS.value).send_event_to(target=kickoff_step)
kickoff_step.on_event(event_id=CommonEvents.START_A_REQUESTED.value).send_event_to(target=step_a)
kickoff_step.on_event(event_id=CommonEvents.START_B_REQUESTED.value).send_event_to(target=step_b)
step_a.on_event(event_id=CommonEvents.A_STEP_DONE.value).send_event_to(target=step_c, parameter_name="astepdata")
step_b.on_event(event_id=CommonEvents.B_STEP_DONE.value).send_event_to(target=step_c, parameter_name="bstepdata")
step_c.on_event(event_id=CommonEvents.C_STEP_DONE.value).send_event_to(target=kickoff_step)
step_c.on_event(event_id=CommonEvents.EXIT_REQUESTED.value).stop_process()
kernel_process: "KernelProcess" = process.build()
async with await _start_local_kernel_process(
process=kernel_process,
kernel=kernel,
initial_event=KernelProcessEvent(id=CommonEvents.START_PROCESS.value, data="Initial"),
) as process_context:
process_state = await process_context.get_state()
c_step_state: KernelProcessStepState[CStepState] | None = next(
(s.state for s in process_state.steps if s.state.name == "CStep"),
None,
)
if c_step_state is None or c_step_state.state is None:
raise RuntimeError("CStep state unavailable")
assert c_step_state.state.current_cycle == 3 # nosec
print(f"Final State Check: CStepState current cycle: {c_step_state.state.current_cycle}")
######################################################################
# region Agent Framework workflow path
######################################################################
@dataclass
class StepResult:
origin: str
cycle: int
data: str
class KickOffExecutor(Executor):
def __init__(self, *, id: str = "kickoff") -> None:
super().__init__(id=id)
self._next_cycle = 0
@handler
async def handle(self, event: CommonEvents, ctx: WorkflowContext[int]) -> None:
if event not in {CommonEvents.START_PROCESS, CommonEvents.C_STEP_DONE}:
return
self._next_cycle += 1
await ctx.send_message(self._next_cycle)
class DelayedStepExecutor(Executor):
def __init__(self, *, name: str, delay_seconds: float) -> None:
super().__init__(id=name)
self._delay = delay_seconds
self._name = name
@handler
async def handle(self, cycle: int, ctx: WorkflowContext[StepResult]) -> None:
await asyncio.sleep(self._delay)
await ctx.send_message(StepResult(origin=self._name, cycle=cycle, data=f"I did {self._name.upper()[-1]}"))
class FanInExecutor(Executor):
def __init__(self, *, required_cycles: int = 3, id: str = "fanin") -> None:
super().__init__(id=id)
self._completed_cycles = 0
self._required_cycles = required_cycles
@handler
async def handle(self, results: list[StepResult], ctx: WorkflowContext[CommonEvents, str]) -> None:
if not results:
return
cycle_number = results[0].cycle
summary = ", ".join(f"{r.origin}: {r.data}" for r in results)
print(f"Cycle {cycle_number} aggregate -> {summary}")
self._completed_cycles += 1
if self._completed_cycles >= self._required_cycles:
await ctx.yield_output(f"Completed {self._completed_cycles} cycles")
return
await ctx.send_message(CommonEvents.C_STEP_DONE)
async def run_agent_framework_workflow_example() -> str | None:
kickoff = KickOffExecutor()
step_a = DelayedStepExecutor(name="step_a", delay_seconds=1)
step_b = DelayedStepExecutor(name="step_b", delay_seconds=2)
aggregate = FanInExecutor(required_cycles=3)
workflow = (
WorkflowBuilder()
.add_edge(kickoff, step_a)
.add_edge(kickoff, step_b)
.add_fan_in_edges([step_a, step_b], aggregate)
.add_edge(aggregate, kickoff)
.set_start_executor(kickoff)
.build()
)
final_text: str | None = None
async for event in workflow.run_stream(CommonEvents.START_PROCESS):
if isinstance(event, WorkflowOutputEvent):
final_text = cast(str, event.data)
return final_text
async def main() -> None:
print("===== Agent Framework Workflow =====")
af_result = await run_agent_framework_workflow_example()
if af_result:
print(af_result)
else:
print("No Agent Framework output.")
print("===== Semantic Kernel Process Framework =====")
await run_semantic_kernel_process_example()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,279 @@
# Copyright (c) Microsoft. All rights reserved.
"""Nested process comparison between Semantic Kernel Process Framework and Agent Framework sub-workflows."""
import asyncio
import logging
from collections.abc import Sequence
from dataclasses import dataclass
from enum import Enum
from typing import ClassVar, cast
######################################################################
# region Agent Framework imports
######################################################################
from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
WorkflowExecutor,
WorkflowOutputEvent,
handler,
)
from pydantic import BaseModel, Field
######################################################################
# region Semantic Kernel imports
######################################################################
from semantic_kernel import Kernel
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion
from semantic_kernel.functions import kernel_function
from semantic_kernel.processes.kernel_process.kernel_process import KernelProcess
from semantic_kernel.processes.kernel_process.kernel_process_event import KernelProcessEventVisibility
from semantic_kernel.processes.kernel_process.kernel_process_step import KernelProcessStep
from semantic_kernel.processes.kernel_process.kernel_process_step_context import KernelProcessStepContext
from semantic_kernel.processes.kernel_process.kernel_process_step_state import KernelProcessStepState
from semantic_kernel.processes.local_runtime.local_kernel_process import start
from semantic_kernel.processes.process_builder import ProcessBuilder
from typing_extensions import Never
######################################################################
# endregion
######################################################################
logging.basicConfig(level=logging.WARNING)
class ProcessEvents(Enum):
START_PROCESS = "StartProcess"
START_INNER_PROCESS = "StartInnerProcess"
OUTPUT_READY_PUBLIC = "OutputReadyPublic"
OUTPUT_READY_INTERNAL = "OutputReadyInternal"
######################################################################
# region Semantic Kernel nested process path
######################################################################
class StepState(BaseModel):
last_message: str | None = None
class EchoStep(KernelProcessStep[None]):
ECHO: ClassVar[str] = "echo"
@kernel_function(name=ECHO)
async def echo(self, message: str) -> str:
print(f"[ECHO] {message}")
return message
class RepeatStep(KernelProcessStep[StepState]):
REPEAT: ClassVar[str] = "repeat"
state: StepState = Field(default_factory=StepState)
async def activate(self, state: KernelProcessStepState[StepState]):
self.state = state.state
@kernel_function(name=REPEAT)
async def repeat(
self,
message: str,
context: KernelProcessStepContext,
count: int = 2,
) -> None:
output = " ".join([message] * count)
self.state.last_message = output
print(f"[REPEAT] {output}")
await context.emit_event(
process_event=ProcessEvents.OUTPUT_READY_PUBLIC.value,
data=output,
visibility=KernelProcessEventVisibility.Public,
)
await context.emit_event(
process_event=ProcessEvents.OUTPUT_READY_INTERNAL.value,
data=output,
visibility=KernelProcessEventVisibility.Internal,
)
def _create_linear_process(name: str) -> ProcessBuilder:
process_builder = ProcessBuilder(name=name)
echo_step = process_builder.add_step(step_type=EchoStep)
repeat_step = process_builder.add_step(step_type=RepeatStep)
process_builder.on_input_event(event_id=ProcessEvents.START_PROCESS.value).send_event_to(target=echo_step)
echo_step.on_function_result(function_name=EchoStep.ECHO).send_event_to(
target=repeat_step,
parameter_name="message",
)
return process_builder
_semantic_kernel = Kernel()
async def run_semantic_kernel_nested_process() -> None:
_semantic_kernel.add_service(OpenAIChatCompletion(service_id="default"))
process_builder = _create_linear_process("Outer")
nested_process_step = process_builder.add_step_from_process(_create_linear_process("Inner"))
process_builder.steps[1].on_event(ProcessEvents.OUTPUT_READY_INTERNAL.value).send_event_to(
nested_process_step.where_input_event_is(ProcessEvents.START_PROCESS.value)
)
kernel_process = process_builder.build()
process_handle = await start(
process=kernel_process,
kernel=_semantic_kernel,
initial_event=ProcessEvents.START_PROCESS.value,
data="Test",
)
process_info = await process_handle.get_state()
inner_process: KernelProcess | None = next(
(s for s in process_info.steps if s.state.name == "Inner"),
None,
)
if inner_process is None:
raise RuntimeError("Inner process state missing")
repeat_state: KernelProcessStepState[StepState] | None = next(
(s.state for s in inner_process.steps if s.state.name == "RepeatStep"),
None,
)
if repeat_state is None or repeat_state.state is None:
raise RuntimeError("RepeatStep state missing")
assert repeat_state.state.last_message == "Test Test Test Test" # nosec
######################################################################
# region Agent Framework nested workflow path
######################################################################
@dataclass
class RepeatPayload:
message: str
count: int = 2
class KickoffExecutor(Executor):
def __init__(self) -> None:
super().__init__(id="kickoff")
@handler
async def start(self, message: str, ctx: WorkflowContext[RepeatPayload]) -> None:
print(f"[OUTER] Start with message: {message}")
await ctx.send_message(RepeatPayload(message=message, count=2))
class OuterEchoExecutor(Executor):
def __init__(self) -> None:
super().__init__(id="outer_echo")
@handler
async def echo(self, payload: RepeatPayload, ctx: WorkflowContext[RepeatPayload]) -> None:
print(f"[OUTER ECHO] {payload.message}")
await ctx.send_message(payload)
class OuterRepeatExecutor(Executor):
def __init__(self, *, inner_target_id: str) -> None:
super().__init__(id="outer_repeat")
self._inner_target_id = inner_target_id
@handler
async def repeat(self, payload: RepeatPayload, ctx: WorkflowContext[RepeatPayload]) -> None:
repeated = " ".join([payload.message] * payload.count)
print(f"[OUTER REPEAT] {repeated}")
await ctx.send_message(RepeatPayload(message=repeated, count=2), target_id=self._inner_target_id)
class InnerEchoExecutor(Executor):
def __init__(self) -> None:
super().__init__(id="inner_echo")
@handler
async def echo(self, payload: RepeatPayload, ctx: WorkflowContext[RepeatPayload]) -> None:
print(f" [INNER ECHO] {payload.message}")
await ctx.send_message(payload)
class InnerRepeatExecutor(Executor):
def __init__(self) -> None:
super().__init__(id="inner_repeat")
@handler
async def repeat(self, payload: RepeatPayload, ctx: WorkflowContext[Never, str]) -> None:
repeated = " ".join([payload.message] * payload.count)
print(f" [INNER REPEAT] {repeated}")
await ctx.yield_output(repeated)
class CollectResultExecutor(Executor):
def __init__(self) -> None:
super().__init__(id="collector")
@handler
async def collect(self, result: str, ctx: WorkflowContext[Never, str]) -> None:
print(f"[COLLECTOR] Final result -> {result}")
await ctx.yield_output(result)
def _build_inner_workflow() -> WorkflowExecutor:
inner_echo = InnerEchoExecutor()
inner_repeat = InnerRepeatExecutor()
inner_workflow = WorkflowBuilder().set_start_executor(inner_echo).add_edge(inner_echo, inner_repeat).build()
return WorkflowExecutor(inner_workflow, id="inner_workflow")
async def run_agent_framework_nested_workflow(initial_message: str) -> Sequence[str]:
inner_executor = _build_inner_workflow()
kickoff = KickoffExecutor()
outer_echo = OuterEchoExecutor()
outer_repeat = OuterRepeatExecutor(inner_target_id=inner_executor.id)
collector = CollectResultExecutor()
outer_workflow = (
WorkflowBuilder()
.set_start_executor(kickoff)
.add_edge(kickoff, outer_echo)
.add_edge(outer_echo, outer_repeat)
.add_edge(outer_repeat, inner_executor)
.add_edge(inner_executor, collector)
.build()
)
results: list[str] = []
async for event in outer_workflow.run_stream(initial_message):
if isinstance(event, WorkflowOutputEvent):
results.append(cast(str, event.data))
return results
######################################################################
# endregion
######################################################################
async def main() -> None:
print("===== Agent Framework Nested Workflow =====")
af_results = await run_agent_framework_nested_workflow("Test")
for index, value in enumerate(af_results, start=1):
print(f"Result {index}: {value}")
print("\n===== Semantic Kernel Nested Process =====")
await run_semantic_kernel_nested_process()
if __name__ == "__main__":
asyncio.run(main())