Files
agent-framework/python/samples/02-agents/middleware/override_result_with_middleware.py
Copilot b05fc9e849 Python: Add load_dotenv() to samples for .env file support (#4043)
* Initial plan

* Add load_dotenv() to 303 Python samples for environment variable loading

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Update SAMPLE_GUIDELINES.md to document load_dotenv() requirement

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Update samples README.md to document .env file usage

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Run ruff format on all changed sample files

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Clarify load_dotenv() usage in README - local dev vs production

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove deprecated getting_started folder as requested

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Document env_file_path parameter for per-client configuration

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Merge main branch to resolve conflicts

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Fix run_evaluation.py file that was empty in merge commit

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove dotnet changes from merge - out of scope for this PR

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove package and test changes from merge - only sample changes needed

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove test_func_utils.py - only sample changes needed

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Revert sample files not in original changeset - keep only load_dotenv additions

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Move load_dotenv() outside snippet tag in 06_host_your_agent.py

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Fix comment placement - move load_dotenv before code comments

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Fix load_dotenv() placement across all samples - after docstring, before code comments

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Merge latest main branch with load_dotenv changes

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove non-sample changes from merge - keep only load_dotenv additions

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Revert non-load_dotenv sample changes from merge

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Fix run_evaluation.py - use main's improved version (file already had load_dotenv)

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Manual update

* Manual update 2

* Fix Role usage and load_dotenv placement per PR review feedback

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Fix Role usage - use string literals not enum attributes

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Fix SAMPLE_GUIDELINES.md example - load_dotenv before docstring per guidance

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Move load_dotenv() before docstrings in all samples per SAMPLE_GUIDELINES ordering

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Address PR review: rename files, fix placement, add session usage, remove note

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Update Redis README to reference renamed file redis_history_provider.py

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>
Co-authored-by: Tao Chen <taochen@microsoft.com>
Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>
Co-authored-by: Eduard van Valkenburg <eavanvalkenburg@users.noreply.github.com>
2026-02-19 10:55:13 +00:00

221 lines
8.3 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
import asyncio
import re
from collections.abc import AsyncIterable, Awaitable, Callable
from random import randint
from typing import Annotated
from agent_framework import (
AgentContext,
AgentResponse,
AgentResponseUpdate,
ChatContext,
ChatResponse,
ChatResponseUpdate,
Content,
Message,
ResponseStream,
tool,
)
from agent_framework.openai import OpenAIResponsesClient
from dotenv import load_dotenv
from pydantic import Field
# Load environment variables from .env file
load_dotenv()
"""
Result Override with MiddlewareTypes (Regular and Streaming)
This sample demonstrates how to use middleware to intercept and modify function results
after execution, supporting both regular and streaming agent responses. The example shows:
- How to execute the original function first and then modify its result
- Replacing function outputs with custom messages or transformed data
- Using middleware for result filtering, formatting, or enhancement
- Detecting streaming vs non-streaming execution using context.stream
- Overriding streaming results with custom async generators
The weather override middleware lets the original weather function execute normally,
then replaces its result with a custom "perfect weather" message. For streaming responses,
it creates a custom async generator that yields the override message in chunks.
"""
# NOTE: approval_mode="never_require" is for sample brevity. Use "always_require" in production;
# see samples/02-agents/tools/function_tool_with_approval.py
# and samples/02-agents/tools/function_tool_with_approval_and_sessions.py.
@tool(approval_mode="never_require")
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def weather_override_middleware(context: ChatContext, call_next: Callable[[], Awaitable[None]]) -> None:
"""Chat middleware that overrides weather results for both streaming and non-streaming cases."""
# Let the original agent execution complete first
await call_next()
# Check if there's a result to override (agent called weather function)
if context.result is not None:
# Create custom weather message
chunks = [
"due to special atmospheric conditions, ",
"all locations are experiencing perfect weather today! ",
"Temperature is a comfortable 22°C with gentle breezes. ",
"Perfect day for outdoor activities!",
]
if context.stream and isinstance(context.result, ResponseStream):
async def _override_stream() -> AsyncIterable[ChatResponseUpdate]:
for i, chunk_text in enumerate(chunks):
yield ChatResponseUpdate(
contents=[Content.from_text(text=f"Weather Advisory: [{i}] {chunk_text}")],
role="assistant",
)
context.result = ResponseStream(_override_stream())
else:
# For non-streaming: just replace with a new message
current_text = context.result.text if isinstance(context.result, ChatResponse) else ""
custom_message = f"Weather Advisory: [0] {''.join(chunks)} Original message was: {current_text}"
context.result = ChatResponse(messages=[Message(role="assistant", text=custom_message)])
async def validate_weather_middleware(context: ChatContext, call_next: Callable[[], Awaitable[None]]) -> None:
"""Chat middleware that simulates result validation for both streaming and non-streaming cases."""
await call_next()
validation_note = "Validation: weather data verified."
if context.result is None:
return
if context.stream and isinstance(context.result, ResponseStream):
def _append_validation_note(response: ChatResponse) -> ChatResponse:
response.messages.append(Message(role="assistant", text=validation_note))
return response
context.result.with_finalizer(_append_validation_note)
elif isinstance(context.result, ChatResponse):
context.result.messages.append(Message(role="assistant", text=validation_note))
async def agent_cleanup_middleware(context: AgentContext, call_next: Callable[[], Awaitable[None]]) -> None:
"""Agent middleware that validates chat middleware effects and cleans the result."""
await call_next()
if context.result is None:
return
validation_note = "Validation: weather data verified."
state = {"found_prefix": False}
def _sanitize(response: AgentResponse) -> AgentResponse:
found_prefix = state["found_prefix"]
found_validation = False
cleaned_messages: list[Message] = []
for message in response.messages:
text = message.text
if text is None:
cleaned_messages.append(message)
continue
if validation_note in text:
found_validation = True
text = text.replace(validation_note, "").strip()
if not text:
continue
if "Weather Advisory:" in text:
found_prefix = True
text = text.replace("Weather Advisory:", "")
text = re.sub(r"\[\d+\]\s*", "", text)
cleaned_messages.append(
Message(
role=message.role,
text=text.strip(),
author_name=message.author_name,
message_id=message.message_id,
additional_properties=message.additional_properties,
raw_representation=message.raw_representation,
)
)
if not found_prefix:
raise RuntimeError("Expected chat middleware prefix not found in agent response.")
if not found_validation:
raise RuntimeError("Expected validation note not found in agent response.")
cleaned_messages.append(Message(role="assistant", text=" Agent: OK"))
response.messages = cleaned_messages
return response
if context.stream and isinstance(context.result, ResponseStream):
def _clean_update(update: AgentResponseUpdate) -> AgentResponseUpdate:
for content in update.contents or []:
if not content.text:
continue
text = content.text
if "Weather Advisory:" in text:
state["found_prefix"] = True
text = text.replace("Weather Advisory:", "")
text = re.sub(r"\[\d+\]\s*", "", text)
content.text = text
return update
context.result.with_transform_hook(_clean_update)
context.result.with_finalizer(_sanitize)
elif isinstance(context.result, AgentResponse):
context.result = _sanitize(context.result)
async def main() -> None:
"""Example demonstrating result override with middleware for both streaming and non-streaming."""
print("=== Result Override MiddlewareTypes Example ===")
# For authentication, run `az login` command in terminal or replace AzureCliCredential with preferred
# authentication option.
agent = OpenAIResponsesClient(
middleware=[validate_weather_middleware, weather_override_middleware],
).as_agent(
name="WeatherAgent",
instructions="You are a helpful weather assistant. Use the weather tool to get current conditions.",
tools=get_weather,
middleware=[agent_cleanup_middleware],
)
# Non-streaming example
print("\n--- Non-streaming Example ---")
query = "What's the weather like in Seattle?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Agent: {result}")
# Streaming example
print("\n--- Streaming Example ---")
query = "What's the weather like in Portland?"
print(f"User: {query}")
print("Agent: ", end="", flush=True)
response = agent.run(query, stream=True)
async for chunk in response:
if chunk.text:
print(chunk.text, end="", flush=True)
print("\n")
print(f"Final Result: {(await response.get_final_response()).text}")
if __name__ == "__main__":
asyncio.run(main())