Files
agent-framework/python/samples/getting_started/agents/custom/custom_agent.py
T
Eduard van Valkenburg 390f93344c Python: Add samples syntax checking with pyright (#3710)
* Add samples syntax checking with pyright

- Add pyrightconfig.samples.json with relaxed type checking but import validation
- Add samples-syntax poe task to check samples for syntax and import errors
- Add samples-syntax to check and pre-commit-check tasks
- Fix 78 sample errors:
  - Update workflow builder imports to use agent_framework_orchestrations
  - Change content type isinstance checks to content.type comparisons
  - Use Content factory methods instead of removed content type classes
  - Fix TypedDict access patterns for Annotation
  - Fix various API mismatches (normalize_messages, ChatMessage.text, role)

* fixed a bunch of samples and tweaks to pre-commit

* updated lock

* updated lock

* fixes

* added lint to samples
2026-02-07 07:10:47 +00:00

209 lines
7.0 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
import asyncio
from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
AgentResponse,
AgentResponseUpdate,
AgentThread,
BaseAgent,
ChatMessage,
Content,
Role,
normalize_messages,
)
"""
Custom Agent Implementation Example
This sample demonstrates implementing a custom agent by extending BaseAgent class,
showing the minimal requirements for both streaming and non-streaming responses.
"""
class EchoAgent(BaseAgent):
"""A simple custom agent that echoes user messages with a prefix.
This demonstrates how to create a fully custom agent by extending BaseAgent
and implementing the required run() method with stream support.
"""
echo_prefix: str = "Echo: "
def __init__(
self,
*,
name: str | None = None,
description: str | None = None,
echo_prefix: str = "Echo: ",
**kwargs: Any,
) -> None:
"""Initialize the EchoAgent.
Args:
name: The name of the agent.
description: The description of the agent.
echo_prefix: The prefix to add to echoed messages.
**kwargs: Additional keyword arguments passed to BaseAgent.
"""
super().__init__(
name=name,
description=description,
echo_prefix=echo_prefix, # type: ignore
**kwargs,
)
def run(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
stream: bool = False,
thread: AgentThread | None = None,
**kwargs: Any,
) -> "AsyncIterable[AgentResponseUpdate] | asyncio.Future[AgentResponse]":
"""Execute the agent and return a response.
Args:
messages: The message(s) to process.
stream: If True, return an async iterable of updates. If False, return an awaitable response.
thread: The conversation thread (optional).
**kwargs: Additional keyword arguments.
Returns:
When stream=False: An awaitable AgentResponse containing the agent's reply.
When stream=True: An async iterable of AgentResponseUpdate objects.
"""
if stream:
return self._run_stream(messages=messages, thread=thread, **kwargs)
return self._run(messages=messages, thread=thread, **kwargs)
async def _run(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentResponse:
"""Non-streaming implementation."""
# Normalize input messages to a list
normalized_messages = normalize_messages(messages)
if not normalized_messages:
response_message = ChatMessage(
role=Role.ASSISTANT,
contents=[
Content.from_text(text="Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")
],
)
else:
# For simplicity, echo the last user message
last_message = normalized_messages[-1]
if last_message.text:
echo_text = f"{self.echo_prefix}{last_message.text}"
else:
echo_text = f"{self.echo_prefix}[Non-text message received]"
response_message = ChatMessage(role=Role.ASSISTANT, contents=[Content.from_text(text=echo_text)])
# Notify the thread of new messages if provided
if thread is not None:
await self._notify_thread_of_new_messages(thread, normalized_messages, response_message)
return AgentResponse(messages=[response_message])
async def _run_stream(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentResponseUpdate]:
"""Streaming implementation."""
# Normalize input messages to a list
normalized_messages = normalize_messages(messages)
if not normalized_messages:
response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
else:
# For simplicity, echo the last user message
last_message = normalized_messages[-1]
if last_message.text:
response_text = f"{self.echo_prefix}{last_message.text}"
else:
response_text = f"{self.echo_prefix}[Non-text message received]"
# Simulate streaming by yielding the response word by word
words = response_text.split()
for i, word in enumerate(words):
# Add space before word except for the first one
chunk_text = f" {word}" if i > 0 else word
yield AgentResponseUpdate(
contents=[Content.from_text(text=chunk_text)],
role=Role.ASSISTANT,
)
# Small delay to simulate streaming
await asyncio.sleep(0.1)
# Notify the thread of the complete response if provided
if thread is not None:
complete_response = ChatMessage(role=Role.ASSISTANT, contents=[Content.from_text(text=response_text)])
await self._notify_thread_of_new_messages(thread, normalized_messages, complete_response)
async def main() -> None:
"""Demonstrates how to use the custom EchoAgent."""
print("=== Custom Agent Example ===\n")
# Create EchoAgent
print("--- EchoAgent Example ---")
echo_agent = EchoAgent(
name="EchoBot", description="A simple agent that echoes messages with a prefix", echo_prefix="🔊 Echo: "
)
# Test non-streaming
print(f"Agent Name: {echo_agent.name}")
print(f"Agent ID: {echo_agent.id}")
query = "Hello, custom agent!"
print(f"\nUser: {query}")
result = await echo_agent.run(query)
print(f"Agent: {result.messages[0].text}")
# Test streaming
query2 = "This is a streaming test"
print(f"\nUser: {query2}")
print("Agent: ", end="", flush=True)
async for chunk in echo_agent.run(query2, stream=True):
if chunk.text:
print(chunk.text, end="", flush=True)
print()
# Example with threads
print("\n--- Using Custom Agent with Thread ---")
thread = echo_agent.get_new_thread()
# First message
result1 = await echo_agent.run("First message", thread=thread)
print("User: First message")
print(f"Agent: {result1.messages[0].text}")
# Second message in same thread
result2 = await echo_agent.run("Second message", thread=thread)
print("User: Second message")
print(f"Agent: {result2.messages[0].text}")
# Check conversation history
if thread.message_store:
messages = await thread.message_store.list_messages()
print(f"\nThread contains {len(messages)} messages in history")
else:
print("\nThread has no message store configured")
if __name__ == "__main__":
asyncio.run(main())