Files
agent-framework/python/samples/getting_started/middleware/middleware_termination.py
T
Eduard van Valkenburg 838a7fd61d Python: [BREAKING] Types API Review improvements (#3647)
* Replace Role and FinishReason classes with NewType + Literal

- Remove EnumLike metaclass from _types.py
- Replace Role class with NewType('Role', str) + RoleLiteral
- Replace FinishReason class with NewType('FinishReason', str) + FinishReasonLiteral
- Update all usages across codebase to use string literals
- Remove .value access patterns (direct string comparison now works)
- Add backward compatibility for legacy dict serialization format
- Update tests to reflect new string-based types

Addresses #3591, #3615

* Simplify ChatResponse and AgentResponse type hints (#3592)

- Remove overloads from ChatResponse.__init__
- Remove text parameter from ChatResponse.__init__
- Remove | dict[str, Any] from finish_reason and usage_details params
- Remove **kwargs from AgentResponse.__init__
- Both now accept ChatMessage | Sequence[ChatMessage] | None for messages
- Update docstrings and examples to reflect changes
- Fix tests that were using removed kwargs
- Fix Role type hint usage in ag-ui utils

* Remove text parameter from ChatResponseUpdate and AgentResponseUpdate (#3597)

- Remove text parameter from ChatResponseUpdate.__init__
- Remove text parameter from AgentResponseUpdate.__init__
- Remove **kwargs from both update classes
- Simplify contents parameter type to Sequence[Content] | None
- Update all usages to use contents=[Content.from_text(...)] pattern
- Fix imports in test files
- Update docstrings and examples

* Rename from_chat_response_updates to from_updates (#3593)

- ChatResponse.from_chat_response_updates → ChatResponse.from_updates
- ChatResponse.from_chat_response_generator → ChatResponse.from_update_generator
- AgentResponse.from_agent_run_response_updates → AgentResponse.from_updates

* Remove try_parse_value method from ChatResponse and AgentResponse (#3595)

- Remove try_parse_value method from ChatResponse
- Remove try_parse_value method from AgentResponse
- Remove try_parse_value calls from from_updates and from_update_generator methods
- Update samples to use try/except with response.value instead
- Update tests to use response.value pattern
- Users should now use response.value with try/except for safe parsing

* Add agent_id to AgentResponse and clarify author_name documentation (#3596)

- Add agent_id parameter to AgentResponse class
- Document that author_name is on ChatMessage objects, not responses
- Update ChatResponse docstring with author_name note
- Update AgentResponse docstring with author_name note

* Simplify ChatMessage.__init__ signature (#3618)

- Make contents a positional argument accepting Sequence[Content | str]
- Auto-convert strings in contents to TextContent
- Remove overloads, keep text kwarg for backward compatibility with serialization
- Update _parse_content_list to handle string items
- Update all usages across codebase to use new format: ChatMessage("role", ["text"])

* Allow Content as input on run and get_response

- Update prepare_messages and normalize_messages to accept Content
- Update type signatures in _agents.py and _clients.py
- Add tests for Content input handling

* Fix ChatMessage usage across packages and samples

Update all remaining ChatMessage(role=..., text=...) to use new
ChatMessage('role', ['text']) signature.

* Fix Role string usage and response format parsing

- Fix redis provider: remove .value access on string literals
- Fix durabletask ensure_response_format: set _response_format before accessing .value

* Fix ollama .value and ai_model_id issues, handle None in content list

- Fix ollama _chat_client: remove .value on string literals
- Fix ollama _chat_client: rename ai_model_id to model_id
- Fix _parse_content_list: skip None values gracefully

* Fix A2AAgent type signature to include Content

* Fix Role/FinishReason NewType dict annotations and improve test coverage to 95%

* Fix mypy errors for Role/FinishReason NewType usage

* Fix Role.TOOL and Role.ASSISTANT usage in _orchestrator_helpers.py

* Fix Role NewType usage in durabletask _models.py
2026-02-04 10:13:23 +00:00

180 lines
6.6 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
import asyncio
from collections.abc import Awaitable, Callable
from random import randint
from typing import Annotated
from agent_framework import (
AgentMiddleware,
AgentResponse,
AgentRunContext,
ChatMessage,
tool,
)
from agent_framework.azure import AzureAIAgentClient
from azure.identity.aio import AzureCliCredential
from pydantic import Field
"""
Middleware Termination Example
This sample demonstrates how middleware can terminate execution using the `context.terminate` flag.
The example includes:
- PreTerminationMiddleware: Terminates execution before calling next() to prevent agent processing
- PostTerminationMiddleware: Allows processing to complete but terminates further execution
This is useful for implementing security checks, rate limiting, or early exit conditions.
"""
# NOTE: approval_mode="never_require" is for sample brevity. Use "always_require" in production; see samples/getting_started/tools/function_tool_with_approval.py and samples/getting_started/tools/function_tool_with_approval_and_threads.py.
@tool(approval_mode="never_require")
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
class PreTerminationMiddleware(AgentMiddleware):
"""Middleware that terminates execution before calling the agent."""
def __init__(self, blocked_words: list[str]):
self.blocked_words = [word.lower() for word in blocked_words]
async def process(
self,
context: AgentRunContext,
next: Callable[[AgentRunContext], Awaitable[None]],
) -> None:
# Check if the user message contains any blocked words
last_message = context.messages[-1] if context.messages else None
if last_message and last_message.text:
query = last_message.text.lower()
for blocked_word in self.blocked_words:
if blocked_word in query:
print(f"[PreTerminationMiddleware] Blocked word '{blocked_word}' detected. Terminating request.")
# Set a custom response
context.result = AgentResponse(
messages=[
ChatMessage(
role="assistant",
text=(
f"Sorry, I cannot process requests containing '{blocked_word}'. "
"Please rephrase your question."
),
)
]
)
# Set terminate flag to prevent further processing
context.terminate = True
break
await next(context)
class PostTerminationMiddleware(AgentMiddleware):
"""Middleware that allows processing but terminates after reaching max responses across multiple runs."""
def __init__(self, max_responses: int = 1):
self.max_responses = max_responses
self.response_count = 0
async def process(
self,
context: AgentRunContext,
next: Callable[[AgentRunContext], Awaitable[None]],
) -> None:
print(f"[PostTerminationMiddleware] Processing request (response count: {self.response_count})")
# Check if we should terminate before processing
if self.response_count >= self.max_responses:
print(
f"[PostTerminationMiddleware] Maximum responses ({self.max_responses}) reached. "
"Terminating further processing."
)
context.terminate = True
# Allow the agent to process normally
await next(context)
# Increment response count after processing
self.response_count += 1
async def pre_termination_middleware() -> None:
"""Demonstrate pre-termination middleware that blocks requests with certain words."""
print("\n--- Example 1: Pre-termination Middleware ---")
async with (
AzureCliCredential() as credential,
AzureAIAgentClient(credential=credential).as_agent(
name="WeatherAgent",
instructions="You are a helpful weather assistant.",
tools=get_weather,
middleware=[PreTerminationMiddleware(blocked_words=["bad", "inappropriate"])],
) as agent,
):
# Test with normal query
print("\n1. Normal query:")
query = "What's the weather like in Seattle?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Agent: {result.text}")
# Test with blocked word
print("\n2. Query with blocked word:")
query = "What's the bad weather in New York?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Agent: {result.text}")
async def post_termination_middleware() -> None:
"""Demonstrate post-termination middleware that limits responses across multiple runs."""
print("\n--- Example 2: Post-termination Middleware ---")
async with (
AzureCliCredential() as credential,
AzureAIAgentClient(credential=credential).as_agent(
name="WeatherAgent",
instructions="You are a helpful weather assistant.",
tools=get_weather,
middleware=[PostTerminationMiddleware(max_responses=1)],
) as agent,
):
# First run (should work)
print("\n1. First run:")
query = "What's the weather in Paris?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Agent: {result.text}")
# Second run (should be terminated by middleware)
print("\n2. Second run (should be terminated):")
query = "What about the weather in London?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Agent: {result.text if result.text else 'No response (terminated)'}")
# Third run (should also be terminated)
print("\n3. Third run (should also be terminated):")
query = "And New York?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Agent: {result.text if result.text else 'No response (terminated)'}")
async def main() -> None:
"""Example demonstrating middleware termination functionality."""
print("=== Middleware Termination Example ===")
await pre_termination_middleware()
await post_termination_middleware()
if __name__ == "__main__":
asyncio.run(main())