Files
agent-framework/python/samples/getting_started/middleware/chat_middleware.py
T
Eduard van Valkenburg 838a7fd61d Python: [BREAKING] Types API Review improvements (#3647)
* Replace Role and FinishReason classes with NewType + Literal

- Remove EnumLike metaclass from _types.py
- Replace Role class with NewType('Role', str) + RoleLiteral
- Replace FinishReason class with NewType('FinishReason', str) + FinishReasonLiteral
- Update all usages across codebase to use string literals
- Remove .value access patterns (direct string comparison now works)
- Add backward compatibility for legacy dict serialization format
- Update tests to reflect new string-based types

Addresses #3591, #3615

* Simplify ChatResponse and AgentResponse type hints (#3592)

- Remove overloads from ChatResponse.__init__
- Remove text parameter from ChatResponse.__init__
- Remove | dict[str, Any] from finish_reason and usage_details params
- Remove **kwargs from AgentResponse.__init__
- Both now accept ChatMessage | Sequence[ChatMessage] | None for messages
- Update docstrings and examples to reflect changes
- Fix tests that were using removed kwargs
- Fix Role type hint usage in ag-ui utils

* Remove text parameter from ChatResponseUpdate and AgentResponseUpdate (#3597)

- Remove text parameter from ChatResponseUpdate.__init__
- Remove text parameter from AgentResponseUpdate.__init__
- Remove **kwargs from both update classes
- Simplify contents parameter type to Sequence[Content] | None
- Update all usages to use contents=[Content.from_text(...)] pattern
- Fix imports in test files
- Update docstrings and examples

* Rename from_chat_response_updates to from_updates (#3593)

- ChatResponse.from_chat_response_updates → ChatResponse.from_updates
- ChatResponse.from_chat_response_generator → ChatResponse.from_update_generator
- AgentResponse.from_agent_run_response_updates → AgentResponse.from_updates

* Remove try_parse_value method from ChatResponse and AgentResponse (#3595)

- Remove try_parse_value method from ChatResponse
- Remove try_parse_value method from AgentResponse
- Remove try_parse_value calls from from_updates and from_update_generator methods
- Update samples to use try/except with response.value instead
- Update tests to use response.value pattern
- Users should now use response.value with try/except for safe parsing

* Add agent_id to AgentResponse and clarify author_name documentation (#3596)

- Add agent_id parameter to AgentResponse class
- Document that author_name is on ChatMessage objects, not responses
- Update ChatResponse docstring with author_name note
- Update AgentResponse docstring with author_name note

* Simplify ChatMessage.__init__ signature (#3618)

- Make contents a positional argument accepting Sequence[Content | str]
- Auto-convert strings in contents to TextContent
- Remove overloads, keep text kwarg for backward compatibility with serialization
- Update _parse_content_list to handle string items
- Update all usages across codebase to use new format: ChatMessage("role", ["text"])

* Allow Content as input on run and get_response

- Update prepare_messages and normalize_messages to accept Content
- Update type signatures in _agents.py and _clients.py
- Add tests for Content input handling

* Fix ChatMessage usage across packages and samples

Update all remaining ChatMessage(role=..., text=...) to use new
ChatMessage('role', ['text']) signature.

* Fix Role string usage and response format parsing

- Fix redis provider: remove .value access on string literals
- Fix durabletask ensure_response_format: set _response_format before accessing .value

* Fix ollama .value and ai_model_id issues, handle None in content list

- Fix ollama _chat_client: remove .value on string literals
- Fix ollama _chat_client: rename ai_model_id to model_id
- Fix _parse_content_list: skip None values gracefully

* Fix A2AAgent type signature to include Content

* Fix Role/FinishReason NewType dict annotations and improve test coverage to 95%

* Fix mypy errors for Role/FinishReason NewType usage

* Fix Role.TOOL and Role.ASSISTANT usage in _orchestrator_helpers.py

* Fix Role NewType usage in durabletask _models.py
2026-02-04 10:13:23 +00:00

248 lines
9.1 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
import asyncio
from collections.abc import Awaitable, Callable
from random import randint
from typing import Annotated
from agent_framework import (
ChatContext,
ChatMessage,
ChatMiddleware,
ChatResponse,
chat_middleware,
tool,
)
from agent_framework.azure import AzureAIAgentClient
from azure.identity.aio import AzureCliCredential
from pydantic import Field
"""
Chat Middleware Example
This sample demonstrates how to use chat middleware to observe and override
inputs sent to AI models. Chat middleware intercepts chat requests before they reach
the underlying AI service, allowing you to:
1. Observe and log input messages
2. Modify input messages before sending to AI
3. Override the entire response
The example covers:
- Class-based chat middleware inheriting from ChatMiddleware
- Function-based chat middleware with @chat_middleware decorator
- Middleware registration at agent level (applies to all runs)
- Middleware registration at run level (applies to specific run only)
"""
# NOTE: approval_mode="never_require" is for sample brevity. Use "always_require" in production; see samples/getting_started/tools/function_tool_with_approval.py and samples/getting_started/tools/function_tool_with_approval_and_threads.py.
@tool(approval_mode="never_require")
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
class InputObserverMiddleware(ChatMiddleware):
"""Class-based middleware that observes and modifies input messages."""
def __init__(self, replacement: str | None = None):
"""Initialize with a replacement for user messages."""
self.replacement = replacement
async def process(
self,
context: ChatContext,
next: Callable[[ChatContext], Awaitable[None]],
) -> None:
"""Observe and modify input messages before they are sent to AI."""
print("[InputObserverMiddleware] Observing input messages:")
for i, message in enumerate(context.messages):
content = message.text if message.text else str(message.contents)
print(f" Message {i + 1} ({message.role}): {content}")
print(f"[InputObserverMiddleware] Total messages: {len(context.messages)}")
# Modify user messages by creating new messages with enhanced text
modified_messages: list[ChatMessage] = []
modified_count = 0
for message in context.messages:
if message.role == "user" and message.text:
original_text = message.text
updated_text = original_text
if self.replacement:
updated_text = self.replacement
print(f"[InputObserverMiddleware] Updated: '{original_text}' -> '{updated_text}'")
modified_message = ChatMessage(message.role, [updated_text])
modified_messages.append(modified_message)
modified_count += 1
else:
modified_messages.append(message)
# Replace messages in context
context.messages[:] = modified_messages
# Continue to next middleware or AI execution
await next(context)
# Observe that processing is complete
print("[InputObserverMiddleware] Processing completed")
@chat_middleware
async def security_and_override_middleware(
context: ChatContext,
next: Callable[[ChatContext], Awaitable[None]],
) -> None:
"""Function-based middleware that implements security filtering and response override."""
print("[SecurityMiddleware] Processing input...")
# Security check - block sensitive information
blocked_terms = ["password", "secret", "api_key", "token"]
for message in context.messages:
if message.text:
message_lower = message.text.lower()
for term in blocked_terms:
if term in message_lower:
print(f"[SecurityMiddleware] BLOCKED: Found '{term}' in message")
# Override the response instead of calling AI
context.result = ChatResponse(
messages=[
ChatMessage(
role="assistant",
text="I cannot process requests containing sensitive information. "
"Please rephrase your question without including passwords, secrets, or other "
"sensitive data.",
)
]
)
# Set terminate flag to stop execution
context.terminate = True
return
# Continue to next middleware or AI execution
await next(context)
async def class_based_chat_middleware() -> None:
"""Demonstrate class-based middleware at agent level."""
print("\n" + "=" * 60)
print("Class-based Chat Middleware (Agent Level)")
print("=" * 60)
# For authentication, run `az login` command in terminal or replace AzureCliCredential with preferred
# authentication option.
async with (
AzureCliCredential() as credential,
AzureAIAgentClient(credential=credential).as_agent(
name="EnhancedChatAgent",
instructions="You are a helpful AI assistant.",
# Register class-based middleware at agent level (applies to all runs)
middleware=[InputObserverMiddleware()],
tools=get_weather,
) as agent,
):
query = "What's the weather in Seattle?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Final Response: {result.text if result.text else 'No response'}")
async def function_based_chat_middleware() -> None:
"""Demonstrate function-based middleware at agent level."""
print("\n" + "=" * 60)
print("Function-based Chat Middleware (Agent Level)")
print("=" * 60)
async with (
AzureCliCredential() as credential,
AzureAIAgentClient(credential=credential).as_agent(
name="FunctionMiddlewareAgent",
instructions="You are a helpful AI assistant.",
# Register function-based middleware at agent level
middleware=[security_and_override_middleware],
) as agent,
):
# Scenario with normal query
print("\n--- Scenario 1: Normal Query ---")
query = "Hello, how are you?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Final Response: {result.text if result.text else 'No response'}")
# Scenario with security violation
print("\n--- Scenario 2: Security Violation ---")
query = "What is my password for this account?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Final Response: {result.text if result.text else 'No response'}")
async def run_level_middleware() -> None:
"""Demonstrate middleware registration at run level."""
print("\n" + "=" * 60)
print("Run-level Chat Middleware")
print("=" * 60)
async with (
AzureCliCredential() as credential,
AzureAIAgentClient(credential=credential).as_agent(
name="RunLevelAgent",
instructions="You are a helpful AI assistant.",
tools=get_weather,
# No middleware at agent level
) as agent,
):
# Scenario 1: Run without any middleware
print("\n--- Scenario 1: No Middleware ---")
query = "What's the weather in Tokyo?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Response: {result.text if result.text else 'No response'}")
# Scenario 2: Run with specific middleware for this call only (both enhancement and security)
print("\n--- Scenario 2: With Run-level Middleware ---")
print(f"User: {query}")
result = await agent.run(
query,
middleware=[
InputObserverMiddleware(replacement="What's the weather in Madrid?"),
security_and_override_middleware,
],
)
print(f"Response: {result.text if result.text else 'No response'}")
# Scenario 3: Security test with run-level middleware
print("\n--- Scenario 3: Security Test with Run-level Middleware ---")
query = "Can you help me with my secret API key?"
print(f"User: {query}")
result = await agent.run(
query,
middleware=[security_and_override_middleware],
)
print(f"Response: {result.text if result.text else 'No response'}")
async def main() -> None:
"""Run all chat middleware examples."""
print("Chat Middleware Examples")
print("========================")
await class_based_chat_middleware()
await function_based_chat_middleware()
await run_level_middleware()
if __name__ == "__main__":
asyncio.run(main())