Files
agent-framework/python/samples/02-agents/middleware/chat_middleware.py
Eduard van Valkenburg a2856d3b92 Python: restructure: Python samples into progressive 01-05 layout (#3862)
* restructure: Python samples into progressive 01-05 layout

- 01-get-started/: 6 numbered steps (hello agent → hosting)
- 02-agents/: all agent concept samples (tools, middleware, providers, etc.)
- 03-workflows/: ALL existing workflow samples preserved as-is
- 04-hosting/: azure-functions, durabletask, a2a
- 05-end-to-end/: demos, evaluation, hosted agents
- Old files moved to _to_delete/ for review
- Added AGENTS.md with structure documentation
- autogen-migration/ and semantic-kernel-migration/ preserved at root

* fix: switch to AzureOpenAI Foundry, fix CI failures

- Switch all 01-get-started samples to AzureOpenAIResponsesClient with
  Azure AI Foundry project endpoint (AZURE_AI_PROJECT_ENDPOINT +
  AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME + AzureCliCredential)
- Add _to_delete/ and 05-end-to-end/ to pyrightconfig.samples.json excludes
- Fix test paths in packages/ that referenced old getting_started/ dirs:
  durabletask conftest + streaming test, azurefunctions conftest,
  devui conftest + capture_messages + openai_sdk_integration
- Fix workflow_as_agent_human_in_the_loop.py import (sibling import)
- Update hosting READMEs and tool comment paths
- Replace root README.md with new structure overview
- Update AGENTS.md to document Azure OpenAI Foundry as default provider

* cleanup: remove _to_delete folder, copy resource files to active dirs

All files in _to_delete/ were either:
- Exact duplicates of files in the new structure (240 files)
- Same file with only comment path updates (100 files)
- One import-fix diff (workflow_as_agent_human_in_the_loop.py)
- One superseded minimal_sample.py

Resource files (sample.pdf, countries.json, employees.pdf, weather.json)
copied to 02-agents/sample_assets/ and 02-agents/resources/ since active
samples reference them.

* fix: address PR review comments, centralize resources, remove root duplicates

- Fix type annotation in 04_memory.py (string union -> proper types)
- Fix old sample paths in observability files
- Fix grammar/spelling in observability samples
- Move sample_assets/ and resources/ to shared/ folder
- Remove 8 duplicate observability files from 02-agents root
- Update resource path references in multimodal_input and provider samples

* fix: update broken links from old getting_started paths to new structure

- Update relative paths in READMEs: getting_started/ → 01-get-started/,
  02-agents/, 03-workflows/, 04-hosting/, 05-end-to-end/
- Fix absolute GitHub URLs in package READMEs
- Fix broken link in ollama package README

* fix: convert absolute GitHub URLs to relative paths for link checker

Absolute URLs to python/samples/ on main branch 404 until PR merges.
Converted to relative paths that linkspector can verify locally.

* fix: update link for handoff sample moved to orchestrations/

* fix: update chatkit-integration README path from demos/ to 05-end-to-end/

* fix: update broken links in orchestrations README to match flat directory structure
2026-02-12 17:36:36 +00:00

248 lines
9.1 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
import asyncio
from collections.abc import Awaitable, Callable
from random import randint
from typing import Annotated
from agent_framework import (
ChatContext,
ChatMiddleware,
ChatResponse,
Message,
MiddlewareTermination,
chat_middleware,
tool,
)
from agent_framework.azure import AzureAIAgentClient
from azure.identity.aio import AzureCliCredential
from pydantic import Field
"""
Chat MiddlewareTypes Example
This sample demonstrates how to use chat middleware to observe and override
inputs sent to AI models. Chat middleware intercepts chat requests before they reach
the underlying AI service, allowing you to:
1. Observe and log input messages
2. Modify input messages before sending to AI
3. Override the entire response
The example covers:
- Class-based chat middleware inheriting from ChatMiddleware
- Function-based chat middleware with @chat_middleware decorator
- MiddlewareTypes registration at agent level (applies to all runs)
- MiddlewareTypes registration at run level (applies to specific run only)
"""
# NOTE: approval_mode="never_require" is for sample brevity. Use "always_require" in production; see samples/02-agents/tools/function_tool_with_approval.py and samples/02-agents/tools/function_tool_with_approval_and_threads.py.
@tool(approval_mode="never_require")
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
class InputObserverMiddleware(ChatMiddleware):
"""Class-based middleware that observes and modifies input messages."""
def __init__(self, replacement: str | None = None):
"""Initialize with a replacement for user messages."""
self.replacement = replacement
async def process(
self,
context: ChatContext,
call_next: Callable[[], Awaitable[None]],
) -> None:
"""Observe and modify input messages before they are sent to AI."""
print("[InputObserverMiddleware] Observing input messages:")
for i, message in enumerate(context.messages):
content = message.text if message.text else str(message.contents)
print(f" Message {i + 1} ({message.role}): {content}")
print(f"[InputObserverMiddleware] Total messages: {len(context.messages)}")
# Modify user messages by creating new messages with enhanced text
modified_messages: list[Message] = []
modified_count = 0
for message in context.messages:
if message.role == "user" and message.text:
original_text = message.text
updated_text = original_text
if self.replacement:
updated_text = self.replacement
print(f"[InputObserverMiddleware] Updated: '{original_text}' -> '{updated_text}'")
modified_message = Message(message.role, [updated_text])
modified_messages.append(modified_message)
modified_count += 1
else:
modified_messages.append(message)
# Replace messages in context
context.messages[:] = modified_messages
# Continue to next middleware or AI execution
await call_next()
# Observe that processing is complete
print("[InputObserverMiddleware] Processing completed")
@chat_middleware
async def security_and_override_middleware(
context: ChatContext,
call_next: Callable[[], Awaitable[None]],
) -> None:
"""Function-based middleware that implements security filtering and response override."""
print("[SecurityMiddleware] Processing input...")
# Security check - block sensitive information
blocked_terms = ["password", "secret", "api_key", "token"]
for message in context.messages:
if message.text:
message_lower = message.text.lower()
for term in blocked_terms:
if term in message_lower:
print(f"[SecurityMiddleware] BLOCKED: Found '{term}' in message")
# Override the response instead of calling AI
context.result = ChatResponse(
messages=[
Message(
role="assistant",
text="I cannot process requests containing sensitive information. "
"Please rephrase your question without including passwords, secrets, or other "
"sensitive data.",
)
]
)
# Set terminate flag to stop execution
raise MiddlewareTermination
# Continue to next middleware or AI execution
await call_next()
async def class_based_chat_middleware() -> None:
"""Demonstrate class-based middleware at agent level."""
print("\n" + "=" * 60)
print("Class-based Chat MiddlewareTypes (Agent Level)")
print("=" * 60)
# For authentication, run `az login` command in terminal or replace AzureCliCredential with preferred
# authentication option.
async with (
AzureCliCredential() as credential,
AzureAIAgentClient(credential=credential).as_agent(
name="EnhancedChatAgent",
instructions="You are a helpful AI assistant.",
# Register class-based middleware at agent level (applies to all runs)
middleware=[InputObserverMiddleware()],
tools=get_weather,
) as agent,
):
query = "What's the weather in Seattle?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Final Response: {result.text if result.text else 'No response'}")
async def function_based_chat_middleware() -> None:
"""Demonstrate function-based middleware at agent level."""
print("\n" + "=" * 60)
print("Function-based Chat MiddlewareTypes (Agent Level)")
print("=" * 60)
async with (
AzureCliCredential() as credential,
AzureAIAgentClient(credential=credential).as_agent(
name="FunctionMiddlewareAgent",
instructions="You are a helpful AI assistant.",
# Register function-based middleware at agent level
middleware=[security_and_override_middleware],
) as agent,
):
# Scenario with normal query
print("\n--- Scenario 1: Normal Query ---")
query = "Hello, how are you?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Final Response: {result.text if result.text else 'No response'}")
# Scenario with security violation
print("\n--- Scenario 2: Security Violation ---")
query = "What is my password for this account?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Final Response: {result.text if result.text else 'No response'}")
async def run_level_middleware() -> None:
"""Demonstrate middleware registration at run level."""
print("\n" + "=" * 60)
print("Run-level Chat MiddlewareTypes")
print("=" * 60)
async with (
AzureCliCredential() as credential,
AzureAIAgentClient(credential=credential).as_agent(
name="RunLevelAgent",
instructions="You are a helpful AI assistant.",
tools=get_weather,
# No middleware at agent level
) as agent,
):
# Scenario 1: Run without any middleware
print("\n--- Scenario 1: No MiddlewareTypes ---")
query = "What's the weather in Tokyo?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Response: {result.text if result.text else 'No response'}")
# Scenario 2: Run with specific middleware for this call only (both enhancement and security)
print("\n--- Scenario 2: With Run-level MiddlewareTypes ---")
print(f"User: {query}")
result = await agent.run(
query,
middleware=[
InputObserverMiddleware(replacement="What's the weather in Madrid?"),
security_and_override_middleware,
],
)
print(f"Response: {result.text if result.text else 'No response'}")
# Scenario 3: Security test with run-level middleware
print("\n--- Scenario 3: Security Test with Run-level MiddlewareTypes ---")
query = "Can you help me with my secret API key?"
print(f"User: {query}")
result = await agent.run(
query,
middleware=[security_and_override_middleware],
)
print(f"Response: {result.text if result.text else 'No response'}")
async def main() -> None:
"""Run all chat middleware examples."""
print("Chat MiddlewareTypes Examples")
print("========================")
await class_based_chat_middleware()
await function_based_chat_middleware()
await run_level_middleware()
if __name__ == "__main__":
asyncio.run(main())