* [BREAKING] Rename ChatAgent -> Agent, ChatMessage -> Message, ChatClientProtocol -> SupportsChatGetResponse Simplify the public API by removing redundant 'Chat' prefix from core types: - ChatAgent -> Agent - RawChatAgent -> RawAgent - ChatMessage -> Message - ChatClientProtocol -> SupportsChatGetResponse Also renamed internal WorkflowMessage (was Message in _runner_context) to avoid collision. No backward compatibility aliases - this is a clean breaking change. * [BREAKING] Rename Agent chat_client parameter to client * Fix rebase issues: WorkflowMessage references and broken markdown links * Fix formatting and lint issues from code quality checks * Fix import ordering in workflow sample files * fixed rebase * Fix test failures: use WorkflowMessage and A2AMessage after ChatMessage→Message rename - Replace Message(data=..., source_id=...) with WorkflowMessage(...) in workflow tests - Fix isinstance check in A2A agent to use A2AMessage instead of Message - Fix import in test_workflow_observability.py (Message→WorkflowMessage) * Fix lint, fmt, and sample errors after ChatMessage→Message rename - Auto-fix 70+ ruff lint issues across samples (ChatMessage→Message refs) - Fix HostedVectorStoreContent→Content.from_hosted_vector_store in file search sample - Fix _normalize_messages→normalize_messages in custom agent sample - Fix context.terminate→raise MiddlewareTermination in middleware samples - Fix with_update_hook→with_transform_hook in override middleware sample - Add TOptions_co import back to custom_chat_client sample - Add noqa for FastAPI File() default in chatkit sample - Fix B023 loop variable capture in weather agent sample * fix: update Agent constructor calls from chat_client to client in declaration-only tool tests * fix: add register_cleanup to devui lazy-loading proxy and type stub * fixed tests and updated new pieces * fix agui typevar * fix merge errors * fix merge conflicts * fiux merge * Remove unused links --------- Co-authored-by: Evan Mattson <evan.mattson@microsoft.com>
Tools and Middleware: Request Flow Architecture
This document describes the complete request flow when using an Agent with middleware and tools, from the initial Agent.run() call through middleware layers, function invocation, and back to the caller.
Overview
The Agent Framework uses a layered architecture with three distinct middleware/processing layers:
- Agent Middleware Layer - Wraps the entire agent execution
- Chat Middleware Layer - Wraps calls to the chat client
- Function Middleware Layer - Wraps individual tool/function invocations
Each layer provides interception points where you can modify inputs, inspect outputs, or alter behavior.
Flow Diagram
sequenceDiagram
participant User
participant Agent as Agent.run()
participant AML as AgentMiddlewareLayer
participant AMP as AgentMiddlewarePipeline
participant RawAgent as RawAgent.run()
participant CML as ChatMiddlewareLayer
participant CMP as ChatMiddlewarePipeline
participant FIL as FunctionInvocationLayer
participant Client as BaseChatClient._inner_get_response()
participant LLM as LLM Service
participant FMP as FunctionMiddlewarePipeline
participant Tool as FunctionTool.invoke()
User->>Agent: run(messages, thread, options, middleware)
Note over Agent,AML: Agent Middleware Layer
Agent->>AML: run() with middleware param
AML->>AML: categorize_middleware() → split by type
AML->>AMP: execute(AgentContext)
loop Agent Middleware Chain
AMP->>AMP: middleware[i].process(context, call_next)
Note right of AMP: Can modify: messages, options, thread
end
AMP->>RawAgent: run() via final_handler
alt Non-Streaming (stream=False)
RawAgent->>RawAgent: _prepare_run_context() [async]
Note right of RawAgent: Builds: thread_messages, chat_options, tools
RawAgent->>CML: client.get_response(stream=False)
else Streaming (stream=True)
RawAgent->>RawAgent: ResponseStream.from_awaitable()
Note right of RawAgent: Defers async prep to stream consumption
RawAgent-->>User: Returns ResponseStream immediately
Note over RawAgent,CML: Async work happens on iteration
RawAgent->>RawAgent: _prepare_run_context() [deferred]
RawAgent->>CML: client.get_response(stream=True)
end
Note over CML,CMP: Chat Middleware Layer
CML->>CMP: execute(ChatContext)
loop Chat Middleware Chain
CMP->>CMP: middleware[i].process(context, call_next)
Note right of CMP: Can modify: messages, options
end
CMP->>FIL: get_response() via final_handler
Note over FIL,Tool: Function Invocation Loop
loop Max Iterations (default: 40)
FIL->>Client: _inner_get_response(messages, options)
Client->>LLM: API Call
LLM-->>Client: Response (may include tool_calls)
Client-->>FIL: ChatResponse
alt Response has function_calls
FIL->>FIL: _extract_function_calls()
FIL->>FIL: _try_execute_function_calls()
Note over FIL,Tool: Function Middleware Layer
loop For each function_call
FIL->>FMP: execute(FunctionInvocationContext)
loop Function Middleware Chain
FMP->>FMP: middleware[i].process(context, call_next)
Note right of FMP: Can modify: arguments
end
FMP->>Tool: invoke(arguments)
Tool-->>FMP: result
FMP-->>FIL: Content.from_function_result()
end
FIL->>FIL: Append tool results to messages
alt tool_choice == "required"
Note right of FIL: Return immediately with function call + result
FIL-->>CMP: ChatResponse
else tool_choice == "auto" or other
Note right of FIL: Continue loop for text response
end
else No function_calls
FIL-->>CMP: ChatResponse
end
end
CMP-->>CML: ChatResponse
Note right of CMP: Can observe/modify result
CML-->>RawAgent: ChatResponse / ResponseStream
alt Non-Streaming
RawAgent->>RawAgent: _finalize_response_and_update_thread()
else Streaming
Note right of RawAgent: .map() transforms updates
Note right of RawAgent: .with_result_hook() runs post-processing
end
RawAgent-->>AMP: AgentResponse / ResponseStream
Note right of AMP: Can observe/modify result
AMP-->>AML: AgentResponse
AML-->>Agent: AgentResponse
Agent-->>User: AgentResponse / ResponseStream
Layer Details
1. Agent Middleware Layer (AgentMiddlewareLayer)
Entry Point: Agent.run(messages, thread, options, middleware)
Context Object: AgentContext
| Field | Type | Description |
|---|---|---|
agent |
SupportsAgentRun |
The agent being invoked |
messages |
list[Message] |
Input messages (mutable) |
thread |
AgentThread | None |
Conversation thread |
options |
Mapping[str, Any] |
Chat options dict |
stream |
bool |
Whether streaming is enabled |
metadata |
dict |
Shared data between middleware |
result |
AgentResponse | None |
Set after call_next() is called |
kwargs |
Mapping[str, Any] |
Additional run arguments |
Key Operations:
categorize_middleware()separates middleware by type (agent, chat, function)- Chat and function middleware are forwarded to
client AgentMiddlewarePipeline.execute()runs the agent middleware chain- Final handler calls
RawAgent.run()
What Can Be Modified:
context.messages- Add, remove, or modify input messagescontext.options- Change model parameters, temperature, etc.context.thread- Replace or modify the threadcontext.result- Override the final response (aftercall_next())
2. Chat Middleware Layer (ChatMiddlewareLayer)
Entry Point: client.get_response(messages, options)
Context Object: ChatContext
| Field | Type | Description |
|---|---|---|
client |
SupportsChatGetResponse |
The chat client |
messages |
Sequence[Message] |
Messages to send |
options |
Mapping[str, Any] |
Chat options |
stream |
bool |
Whether streaming |
metadata |
dict |
Shared data between middleware |
result |
ChatResponse | None |
Set after call_next() is called |
kwargs |
Mapping[str, Any] |
Additional arguments |
Key Operations:
ChatMiddlewarePipeline.execute()runs the chat middleware chain- Final handler calls
FunctionInvocationLayer.get_response() - Stream hooks can be registered for streaming responses
What Can Be Modified:
context.messages- Inject system prompts, filter contentcontext.options- Change model, temperature, tool_choicecontext.result- Override the response (aftercall_next())
3. Function Invocation Layer (FunctionInvocationLayer)
Entry Point: FunctionInvocationLayer.get_response()
This layer manages the tool execution loop:
- Calls
BaseChatClient._inner_get_response()to get LLM response - Extracts function calls from the response
- Executes functions through the Function Middleware Pipeline
- Appends results to messages and loops back to step 1
Configuration: FunctionInvocationConfiguration
| Setting | Default | Description |
|---|---|---|
enabled |
True |
Enable auto-invocation |
max_iterations |
40 |
Maximum tool execution loops |
max_consecutive_errors_per_request |
3 |
Error threshold before stopping |
terminate_on_unknown_calls |
False |
Raise error for unknown tools |
additional_tools |
[] |
Extra tools to register |
include_detailed_errors |
False |
Include exceptions in results |
tool_choice Behavior:
The tool_choice option controls how the model uses available tools:
| Value | Behavior |
|---|---|
"auto" |
Model decides whether to call a tool or respond with text. After tool execution, the loop continues to get a text response. |
"none" |
Model is prevented from calling tools, will only respond with text. |
"required" |
Model must call a tool. After tool execution, returns immediately with the function call and result—no additional model call is made. |
{"mode": "required", "required_function_name": "fn"} |
Model must call the specified function. Same return behavior as "required". |
Why tool_choice="required" returns immediately:
When you set tool_choice="required", your intent is to force one or more tool calls (not all models supports multiple, either by name or when using required without a name). The framework respects this by:
- Getting the model's function call(s)
- Executing the tool(s)
- Returning the response(s) with both the function call message(s) and the function result(s)
This avoids an infinite loop (model forced to call tools → executes → model forced to call tools again) and gives you direct access to the tool result.
# With tool_choice="required", response contains function call + result only
response = await client.get_response(
"What's the weather?",
options={"tool_choice": "required", "tools": [get_weather]}
)
# response.messages contains:
# [0] Assistant message with function_call content
# [1] Tool message with function_result content
# (No text response from model)
# To get a text response after tool execution, use tool_choice="auto"
response = await client.get_response(
"What's the weather?",
options={"tool_choice": "auto", "tools": [get_weather]}
)
# response.text contains the model's interpretation of the weather data
4. Function Middleware Layer (FunctionMiddlewarePipeline)
Entry Point: Called per function invocation within _auto_invoke_function()
Context Object: FunctionInvocationContext
| Field | Type | Description |
|---|---|---|
function |
FunctionTool |
The function being invoked |
arguments |
BaseModel |
Validated Pydantic arguments |
metadata |
dict |
Shared data between middleware |
result |
Any |
Set after call_next() is called |
kwargs |
Mapping[str, Any] |
Runtime kwargs |
What Can Be Modified:
context.arguments- Modify validated arguments before executioncontext.result- Override the function result (aftercall_next())- Raise
MiddlewareTerminationto skip execution and terminate the function invocation loop
Special Behavior: When MiddlewareTermination is raised in function middleware, it signals that the function invocation loop should exit without making another LLM call. This is useful when middleware determines that no further processing is needed (e.g., a termination condition is met).
class TerminatingMiddleware(FunctionMiddleware):
async def process(self, context: FunctionInvocationContext, call_next):
if self.should_terminate(context):
context.result = "terminated by middleware"
raise MiddlewareTermination # Exit function invocation loop
await call_next(context)
Arguments Added/Altered at Each Layer
Agent Layer → Chat Layer
# RawAgent._prepare_run_context() builds:
{
"thread": AgentThread, # Validated/created thread
"input_messages": [...], # Normalized input messages
"thread_messages": [...], # Messages from thread + context + input
"agent_name": "...", # Agent name for attribution
"chat_options": {
"model_id": "...",
"conversation_id": "...", # From thread.service_thread_id
"tools": [...], # Normalized tools + MCP tools
"temperature": ...,
"max_tokens": ...,
# ... other options
},
"filtered_kwargs": {...}, # kwargs minus 'chat_options'
"finalize_kwargs": {...}, # kwargs with 'thread' added
}
Chat Layer → Function Layer
# Passed through to FunctionInvocationLayer:
{
"messages": [...], # Prepared messages
"options": {...}, # Mutable copy of chat_options
"function_middleware": [...], # Function middleware from kwargs
}
Function Layer → Tool Invocation
# FunctionInvocationContext receives:
{
"function": FunctionTool, # The tool to invoke
"arguments": BaseModel, # Validated from function_call.arguments
"kwargs": {
# Runtime kwargs (filtered, no conversation_id)
},
}
Tool Result → Back Up
# Content.from_function_result() creates:
{
"type": "function_result",
"call_id": "...", # From function_call.call_id
"result": ..., # Serialized tool output
"exception": "..." | None, # Error message if failed
}
Middleware Control Flow
There are three ways to exit a middleware's process() method:
1. Return Normally (with or without calling call_next)
Returns control to the upstream middleware, allowing its post-processing code to run.
class CachingMiddleware(FunctionMiddleware):
async def process(self, context: FunctionInvocationContext, call_next):
# Option A: Return early WITHOUT calling call_next (skip downstream)
if cached := self.cache.get(context.function.name):
context.result = cached
return # Upstream post-processing still runs
# Option B: Call call_next, then return normally
await call_next(context)
self.cache[context.function.name] = context.result
return # Normal completion
2. Raise MiddlewareTermination
Immediately exits the entire middleware chain. Upstream middleware's post-processing code is skipped.
class BlockedFunctionMiddleware(FunctionMiddleware):
async def process(self, context: FunctionInvocationContext, call_next):
if context.function.name in self.blocked_functions:
context.result = "Function blocked by policy"
raise MiddlewareTermination("Blocked") # Skips ALL post-processing
await call_next(context)
3. Raise Any Other Exception
Bubbles up to the caller. The middleware chain is aborted and the exception propagates.
class ValidationMiddleware(FunctionMiddleware):
async def process(self, context: FunctionInvocationContext, call_next):
if not self.is_valid(context.arguments):
raise ValueError("Invalid arguments") # Bubbles up to user
await call_next(context)
return vs raise MiddlewareTermination
The key difference is what happens to upstream middleware's post-processing:
class MiddlewareA(AgentMiddleware):
async def process(self, context, call_next):
print("A: before")
await call_next(context)
print("A: after") # Does this run?
class MiddlewareB(AgentMiddleware):
async def process(self, context, call_next):
print("B: before")
context.result = "early result"
# Choose one:
return # Option 1
# raise MiddlewareTermination() # Option 2
With middleware registered as [MiddlewareA, MiddlewareB]:
| Exit Method | Output |
|---|---|
return |
A: before → B: before → A: after |
raise MiddlewareTermination |
A: before → B: before (no A: after) |
Use return when you want upstream middleware to still process the result (e.g., logging, metrics).
Use raise MiddlewareTermination when you want to completely bypass all remaining processing (e.g., blocking a request, returning cached response without any modification).
Calling call_next() or Not
The decision to call call_next(context) determines whether downstream middleware and the actual operation execute:
Without calling call_next() - Skip downstream
async def process(self, context, call_next):
context.result = "replacement result"
return # Downstream middleware and actual execution are SKIPPED
- Downstream middleware: ❌ NOT executed
- Actual operation (LLM call, function invocation): ❌ NOT executed
- Upstream middleware post-processing: ✅ Still runs (unless
MiddlewareTerminationraised) - Result: Whatever you set in
context.result
With calling call_next() - Full execution
async def process(self, context, call_next):
# Pre-processing
await call_next(context) # Execute downstream + actual operation
# Post-processing (context.result now contains real result)
return
- Downstream middleware: ✅ Executed
- Actual operation: ✅ Executed
- Upstream middleware post-processing: ✅ Runs
- Result: The actual result (possibly modified in post-processing)
Summary Table
| Exit Method | Call call_next()? |
Downstream Executes? | Actual Op Executes? | Upstream Post-Processing? |
|---|---|---|---|---|
return (or implicit) |
Yes | ✅ | ✅ | ✅ Yes |
return |
No | ❌ | ❌ | ✅ Yes |
raise MiddlewareTermination |
No | ❌ | ❌ | ❌ No |
raise MiddlewareTermination |
Yes | ✅ | ✅ | ❌ No |
raise OtherException |
Either | Depends | Depends | ❌ No (exception propagates) |
Note: The first row (
returnafter callingcall_next()) is the default behavior. Python functions implicitly returnNoneat the end, so simply callingawait call_next(context)without an explicitreturnstatement achieves this pattern.
Streaming vs Non-Streaming
The run() method handles streaming and non-streaming differently:
Non-Streaming (stream=False)
Returns Awaitable[AgentResponse]:
async def _run_non_streaming():
ctx = await self._prepare_run_context(...) # Async preparation
response = await self.client.get_response(stream=False, ...)
await self._finalize_response_and_update_thread(...)
return AgentResponse(...)
Streaming (stream=True)
Returns ResponseStream[AgentResponseUpdate, AgentResponse] synchronously:
# Async preparation is deferred using ResponseStream.from_awaitable()
async def _get_stream():
ctx = await self._prepare_run_context(...) # Deferred until iteration
return self.client.get_response(stream=True, ...)
return (
ResponseStream.from_awaitable(_get_stream())
.map(
transform=map_chat_to_agent_update, # Transform each update
finalizer=self._finalize_response_updates, # Build final response
)
.with_result_hook(_post_hook) # Post-processing after finalization
)
Key points:
ResponseStream.from_awaitable()wraps an async function, deferring execution until the stream is consumed.map()transformsChatResponseUpdate→AgentResponseUpdateand provides the finalizer.with_result_hook()runs after finalization (e.g., notify thread of new messages)
See Also
- Middleware Samples - Examples of custom middleware
- Function Tool Samples - Creating and using tools