Files
agent-framework/python/samples/getting_started/agents/custom/custom_agent.py
T
Eduard van Valkenburg 0521f5bed8 Python: [BREAKING] Simplify API: ChatAgent -> Agent, ChatMessage -> Message (#3747)
* [BREAKING] Rename ChatAgent -> Agent, ChatMessage -> Message, ChatClientProtocol -> SupportsChatGetResponse

Simplify the public API by removing redundant 'Chat' prefix from core types:
- ChatAgent -> Agent
- RawChatAgent -> RawAgent
- ChatMessage -> Message
- ChatClientProtocol -> SupportsChatGetResponse

Also renamed internal WorkflowMessage (was Message in _runner_context) to avoid collision.

No backward compatibility aliases - this is a clean breaking change.

* [BREAKING] Rename Agent chat_client parameter to client

* Fix rebase issues: WorkflowMessage references and broken markdown links

* Fix formatting and lint issues from code quality checks

* Fix import ordering in workflow sample files

* fixed rebase

* Fix test failures: use WorkflowMessage and A2AMessage after ChatMessage→Message rename

- Replace Message(data=..., source_id=...) with WorkflowMessage(...) in workflow tests
- Fix isinstance check in A2A agent to use A2AMessage instead of Message
- Fix import in test_workflow_observability.py (Message→WorkflowMessage)

* Fix lint, fmt, and sample errors after ChatMessage→Message rename

- Auto-fix 70+ ruff lint issues across samples (ChatMessage→Message refs)
- Fix HostedVectorStoreContent→Content.from_hosted_vector_store in file search sample
- Fix _normalize_messages→normalize_messages in custom agent sample
- Fix context.terminate→raise MiddlewareTermination in middleware samples
- Fix with_update_hook→with_transform_hook in override middleware sample
- Add TOptions_co import back to custom_chat_client sample
- Add noqa for FastAPI File() default in chatkit sample
- Fix B023 loop variable capture in weather agent sample

* fix: update Agent constructor calls from chat_client to client in declaration-only tool tests

* fix: add register_cleanup to devui lazy-loading proxy and type stub

* fixed tests and updated new pieces

* fix agui typevar

* fix merge errors

* fix merge conflicts

* fiux merge

* Remove unused links

---------

Co-authored-by: Evan Mattson <evan.mattson@microsoft.com>
2026-02-10 23:04:32 +00:00

207 lines
6.9 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
import asyncio
from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
AgentResponse,
AgentResponseUpdate,
AgentThread,
BaseAgent,
Content,
Message,
Role,
normalize_messages,
)
"""
Custom Agent Implementation Example
This sample demonstrates implementing a custom agent by extending BaseAgent class,
showing the minimal requirements for both streaming and non-streaming responses.
"""
class EchoAgent(BaseAgent):
"""A simple custom agent that echoes user messages with a prefix.
This demonstrates how to create a fully custom agent by extending BaseAgent
and implementing the required run() method with stream support.
"""
echo_prefix: str = "Echo: "
def __init__(
self,
*,
name: str | None = None,
description: str | None = None,
echo_prefix: str = "Echo: ",
**kwargs: Any,
) -> None:
"""Initialize the EchoAgent.
Args:
name: The name of the agent.
description: The description of the agent.
echo_prefix: The prefix to add to echoed messages.
**kwargs: Additional keyword arguments passed to BaseAgent.
"""
super().__init__(
name=name,
description=description,
echo_prefix=echo_prefix, # type: ignore
**kwargs,
)
def run(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
stream: bool = False,
thread: AgentThread | None = None,
**kwargs: Any,
) -> "AsyncIterable[AgentResponseUpdate] | asyncio.Future[AgentResponse]":
"""Execute the agent and return a response.
Args:
messages: The message(s) to process.
stream: If True, return an async iterable of updates. If False, return an awaitable response.
thread: The conversation thread (optional).
**kwargs: Additional keyword arguments.
Returns:
When stream=False: An awaitable AgentResponse containing the agent's reply.
When stream=True: An async iterable of AgentResponseUpdate objects.
"""
if stream:
return self._run_stream(messages=messages, thread=thread, **kwargs)
return self._run(messages=messages, thread=thread, **kwargs)
async def _run(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentResponse:
"""Non-streaming implementation."""
# Normalize input messages to a list
normalized_messages = normalize_messages(messages)
if not normalized_messages:
response_message = Message(
role=Role.ASSISTANT,
contents=[Content.from_text(text="Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")],
)
else:
# For simplicity, echo the last user message
last_message = normalized_messages[-1]
if last_message.text:
echo_text = f"{self.echo_prefix}{last_message.text}"
else:
echo_text = f"{self.echo_prefix}[Non-text message received]"
response_message = Message(role=Role.ASSISTANT, contents=[Content.from_text(text=echo_text)])
# Notify the thread of new messages if provided
if thread is not None:
await self._notify_thread_of_new_messages(thread, normalized_messages, response_message)
return AgentResponse(messages=[response_message])
async def _run_stream(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentResponseUpdate]:
"""Streaming implementation."""
# Normalize input messages to a list
normalized_messages = normalize_messages(messages)
if not normalized_messages:
response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
else:
# For simplicity, echo the last user message
last_message = normalized_messages[-1]
if last_message.text:
response_text = f"{self.echo_prefix}{last_message.text}"
else:
response_text = f"{self.echo_prefix}[Non-text message received]"
# Simulate streaming by yielding the response word by word
words = response_text.split()
for i, word in enumerate(words):
# Add space before word except for the first one
chunk_text = f" {word}" if i > 0 else word
yield AgentResponseUpdate(
contents=[Content.from_text(text=chunk_text)],
role=Role.ASSISTANT,
)
# Small delay to simulate streaming
await asyncio.sleep(0.1)
# Notify the thread of the complete response if provided
if thread is not None:
complete_response = Message(role=Role.ASSISTANT, contents=[Content.from_text(text=response_text)])
await self._notify_thread_of_new_messages(thread, normalized_messages, complete_response)
async def main() -> None:
"""Demonstrates how to use the custom EchoAgent."""
print("=== Custom Agent Example ===\n")
# Create EchoAgent
print("--- EchoAgent Example ---")
echo_agent = EchoAgent(
name="EchoBot", description="A simple agent that echoes messages with a prefix", echo_prefix="🔊 Echo: "
)
# Test non-streaming
print(f"Agent Name: {echo_agent.name}")
print(f"Agent ID: {echo_agent.id}")
query = "Hello, custom agent!"
print(f"\nUser: {query}")
result = await echo_agent.run(query)
print(f"Agent: {result.messages[0].text}")
# Test streaming
query2 = "This is a streaming test"
print(f"\nUser: {query2}")
print("Agent: ", end="", flush=True)
async for chunk in echo_agent.run(query2, stream=True):
if chunk.text:
print(chunk.text, end="", flush=True)
print()
# Example with threads
print("\n--- Using Custom Agent with Thread ---")
thread = echo_agent.get_new_thread()
# First message
result1 = await echo_agent.run("First message", thread=thread)
print("User: First message")
print(f"Agent: {result1.messages[0].text}")
# Second message in same thread
result2 = await echo_agent.run("Second message", thread=thread)
print("User: Second message")
print(f"Agent: {result2.messages[0].text}")
# Check conversation history
if thread.message_store:
messages = await thread.message_store.list_messages()
print(f"\nThread contains {len(messages)} messages in history")
else:
print("\nThread has no message store configured")
if __name__ == "__main__":
asyncio.run(main())