Files
Eduard van Valkenburg a2856d3b92 Python: restructure: Python samples into progressive 01-05 layout (#3862)
* restructure: Python samples into progressive 01-05 layout

- 01-get-started/: 6 numbered steps (hello agent → hosting)
- 02-agents/: all agent concept samples (tools, middleware, providers, etc.)
- 03-workflows/: ALL existing workflow samples preserved as-is
- 04-hosting/: azure-functions, durabletask, a2a
- 05-end-to-end/: demos, evaluation, hosted agents
- Old files moved to _to_delete/ for review
- Added AGENTS.md with structure documentation
- autogen-migration/ and semantic-kernel-migration/ preserved at root

* fix: switch to AzureOpenAI Foundry, fix CI failures

- Switch all 01-get-started samples to AzureOpenAIResponsesClient with
  Azure AI Foundry project endpoint (AZURE_AI_PROJECT_ENDPOINT +
  AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME + AzureCliCredential)
- Add _to_delete/ and 05-end-to-end/ to pyrightconfig.samples.json excludes
- Fix test paths in packages/ that referenced old getting_started/ dirs:
  durabletask conftest + streaming test, azurefunctions conftest,
  devui conftest + capture_messages + openai_sdk_integration
- Fix workflow_as_agent_human_in_the_loop.py import (sibling import)
- Update hosting READMEs and tool comment paths
- Replace root README.md with new structure overview
- Update AGENTS.md to document Azure OpenAI Foundry as default provider

* cleanup: remove _to_delete folder, copy resource files to active dirs

All files in _to_delete/ were either:
- Exact duplicates of files in the new structure (240 files)
- Same file with only comment path updates (100 files)
- One import-fix diff (workflow_as_agent_human_in_the_loop.py)
- One superseded minimal_sample.py

Resource files (sample.pdf, countries.json, employees.pdf, weather.json)
copied to 02-agents/sample_assets/ and 02-agents/resources/ since active
samples reference them.

* fix: address PR review comments, centralize resources, remove root duplicates

- Fix type annotation in 04_memory.py (string union -> proper types)
- Fix old sample paths in observability files
- Fix grammar/spelling in observability samples
- Move sample_assets/ and resources/ to shared/ folder
- Remove 8 duplicate observability files from 02-agents root
- Update resource path references in multimodal_input and provider samples

* fix: update broken links from old getting_started paths to new structure

- Update relative paths in READMEs: getting_started/ → 01-get-started/,
  02-agents/, 03-workflows/, 04-hosting/, 05-end-to-end/
- Fix absolute GitHub URLs in package READMEs
- Fix broken link in ollama package README

* fix: convert absolute GitHub URLs to relative paths for link checker

Absolute URLs to python/samples/ on main branch 404 until PR merges.
Converted to relative paths that linkspector can verify locally.

* fix: update link for handoff sample moved to orchestrations/

* fix: update chatkit-integration README path from demos/ to 05-end-to-end/

* fix: update broken links in orchestrations README to match flat directory structure
2026-02-12 17:36:36 +00:00

207 lines
6.9 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
import asyncio
from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
AgentResponse,
AgentResponseUpdate,
AgentThread,
BaseAgent,
Content,
Message,
Role,
normalize_messages,
)
"""
Custom Agent Implementation Example
This sample demonstrates implementing a custom agent by extending BaseAgent class,
showing the minimal requirements for both streaming and non-streaming responses.
"""
class EchoAgent(BaseAgent):
"""A simple custom agent that echoes user messages with a prefix.
This demonstrates how to create a fully custom agent by extending BaseAgent
and implementing the required run() method with stream support.
"""
echo_prefix: str = "Echo: "
def __init__(
self,
*,
name: str | None = None,
description: str | None = None,
echo_prefix: str = "Echo: ",
**kwargs: Any,
) -> None:
"""Initialize the EchoAgent.
Args:
name: The name of the agent.
description: The description of the agent.
echo_prefix: The prefix to add to echoed messages.
**kwargs: Additional keyword arguments passed to BaseAgent.
"""
super().__init__(
name=name,
description=description,
echo_prefix=echo_prefix, # type: ignore
**kwargs,
)
def run(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
stream: bool = False,
thread: AgentThread | None = None,
**kwargs: Any,
) -> "AsyncIterable[AgentResponseUpdate] | asyncio.Future[AgentResponse]":
"""Execute the agent and return a response.
Args:
messages: The message(s) to process.
stream: If True, return an async iterable of updates. If False, return an awaitable response.
thread: The conversation thread (optional).
**kwargs: Additional keyword arguments.
Returns:
When stream=False: An awaitable AgentResponse containing the agent's reply.
When stream=True: An async iterable of AgentResponseUpdate objects.
"""
if stream:
return self._run_stream(messages=messages, thread=thread, **kwargs)
return self._run(messages=messages, thread=thread, **kwargs)
async def _run(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentResponse:
"""Non-streaming implementation."""
# Normalize input messages to a list
normalized_messages = normalize_messages(messages)
if not normalized_messages:
response_message = Message(
role=Role.ASSISTANT,
contents=[Content.from_text(text="Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")],
)
else:
# For simplicity, echo the last user message
last_message = normalized_messages[-1]
if last_message.text:
echo_text = f"{self.echo_prefix}{last_message.text}"
else:
echo_text = f"{self.echo_prefix}[Non-text message received]"
response_message = Message(role=Role.ASSISTANT, contents=[Content.from_text(text=echo_text)])
# Notify the thread of new messages if provided
if thread is not None:
await self._notify_thread_of_new_messages(thread, normalized_messages, response_message)
return AgentResponse(messages=[response_message])
async def _run_stream(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentResponseUpdate]:
"""Streaming implementation."""
# Normalize input messages to a list
normalized_messages = normalize_messages(messages)
if not normalized_messages:
response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
else:
# For simplicity, echo the last user message
last_message = normalized_messages[-1]
if last_message.text:
response_text = f"{self.echo_prefix}{last_message.text}"
else:
response_text = f"{self.echo_prefix}[Non-text message received]"
# Simulate streaming by yielding the response word by word
words = response_text.split()
for i, word in enumerate(words):
# Add space before word except for the first one
chunk_text = f" {word}" if i > 0 else word
yield AgentResponseUpdate(
contents=[Content.from_text(text=chunk_text)],
role=Role.ASSISTANT,
)
# Small delay to simulate streaming
await asyncio.sleep(0.1)
# Notify the thread of the complete response if provided
if thread is not None:
complete_response = Message(role=Role.ASSISTANT, contents=[Content.from_text(text=response_text)])
await self._notify_thread_of_new_messages(thread, normalized_messages, complete_response)
async def main() -> None:
"""Demonstrates how to use the custom EchoAgent."""
print("=== Custom Agent Example ===\n")
# Create EchoAgent
print("--- EchoAgent Example ---")
echo_agent = EchoAgent(
name="EchoBot", description="A simple agent that echoes messages with a prefix", echo_prefix="🔊 Echo: "
)
# Test non-streaming
print(f"Agent Name: {echo_agent.name}")
print(f"Agent ID: {echo_agent.id}")
query = "Hello, custom agent!"
print(f"\nUser: {query}")
result = await echo_agent.run(query)
print(f"Agent: {result.messages[0].text}")
# Test streaming
query2 = "This is a streaming test"
print(f"\nUser: {query2}")
print("Agent: ", end="", flush=True)
async for chunk in echo_agent.run(query2, stream=True):
if chunk.text:
print(chunk.text, end="", flush=True)
print()
# Example with threads
print("\n--- Using Custom Agent with Thread ---")
thread = echo_agent.get_new_thread()
# First message
result1 = await echo_agent.run("First message", thread=thread)
print("User: First message")
print(f"Agent: {result1.messages[0].text}")
# Second message in same thread
result2 = await echo_agent.run("Second message", thread=thread)
print("User: Second message")
print(f"Agent: {result2.messages[0].text}")
# Check conversation history
if thread.message_store:
messages = await thread.message_store.list_messages()
print(f"\nThread contains {len(messages)} messages in history")
else:
print("\nThread has no message store configured")
if __name__ == "__main__":
asyncio.run(main())