Fix handoff workflow context management and improve AG-UI demo (#5136)

This commit is contained in:
Evan Mattson
2026-04-08 13:08:24 +09:00
committed by GitHub
Unverified
parent f94a75daa5
commit e10d448ae2
19 changed files with 601 additions and 252 deletions
@@ -80,6 +80,7 @@ def create_agents(client: FoundryChatClient) -> tuple[Agent, Agent, Agent, Agent
"based on the problem described."
),
name="triage_agent",
require_per_service_call_history_persistence=True,
)
# Refund specialist: Handles refund requests
@@ -89,6 +90,7 @@ def create_agents(client: FoundryChatClient) -> tuple[Agent, Agent, Agent, Agent
name="refund_agent",
# In a real application, an agent can have multiple tools; here we keep it simple
tools=[process_refund],
require_per_service_call_history_persistence=True,
)
# Order/shipping specialist: Resolves delivery issues
@@ -98,6 +100,7 @@ def create_agents(client: FoundryChatClient) -> tuple[Agent, Agent, Agent, Agent
name="order_agent",
# In a real application, an agent can have multiple tools; here we keep it simple
tools=[check_order_status],
require_per_service_call_history_persistence=True,
)
# Return specialist: Handles return requests
@@ -107,6 +110,7 @@ def create_agents(client: FoundryChatClient) -> tuple[Agent, Agent, Agent, Agent
name="return_agent",
# In a real application, an agent can have multiple tools; here we keep it simple
tools=[process_return],
require_per_service_call_history_persistence=True,
)
return triage_agent, refund_agent, order_agent, return_agent
@@ -85,6 +85,8 @@ from agent_framework.orchestrations import (
**Handoff workflow tip**: Handoff workflows maintain the full conversation history including any `Message.additional_properties` emitted by your agents. This ensures routing metadata remains intact across all agent transitions. For specialist-to-specialist handoffs, use `.add_handoff(source, targets)` to configure which agents can route to which others with a fluent, type-safe API.
**Handoff `require_per_service_call_history_persistence`**: All agents in a handoff workflow **must** set `require_per_service_call_history_persistence=True`. `HandoffBuilder.build()` will raise a `ValueError` if any participant is missing this flag. This is required because handoff middleware short-circuits tool calls via `MiddlewareTermination`, and without per-service-call history persistence, local history would store tool results the service never received, causing mismatches on subsequent turns.
**Sequential orchestration note**: Sequential orchestration uses a few small adapter nodes for plumbing:
- `input-conversation` normalizes input to `list[Message]`
- `to-conversation:<participant>` converts agent responses into the shared conversation
@@ -53,6 +53,7 @@ def create_agents(
"Assign the two tasks to the appropriate specialists, one after the other."
),
name="coordinator",
require_per_service_call_history_persistence=True,
)
research_agent = Agent(
@@ -66,6 +67,7 @@ def create_agents(
"coordinator. Keep each individual response focused on one aspect."
),
name="research_agent",
require_per_service_call_history_persistence=True,
)
summary_agent = Agent(
@@ -75,6 +77,7 @@ def create_agents(
"control to the coordinator."
),
name="summary_agent",
require_per_service_call_history_persistence=True,
)
return coordinator, research_agent, summary_agent
@@ -77,6 +77,7 @@ def create_agents(client: FoundryChatClient) -> tuple[Agent, Agent, Agent, Agent
"based on the problem described."
),
name="triage_agent",
require_per_service_call_history_persistence=True,
)
# Refund specialist: Handles refund requests
@@ -86,6 +87,7 @@ def create_agents(client: FoundryChatClient) -> tuple[Agent, Agent, Agent, Agent
name="refund_agent",
# In a real application, an agent can have multiple tools; here we keep it simple
tools=[process_refund],
require_per_service_call_history_persistence=True,
)
# Order/shipping specialist: Resolves delivery issues
@@ -95,6 +97,7 @@ def create_agents(client: FoundryChatClient) -> tuple[Agent, Agent, Agent, Agent
name="order_agent",
# In a real application, an agent can have multiple tools; here we keep it simple
tools=[check_order_status],
require_per_service_call_history_persistence=True,
)
# Return specialist: Handles return requests
@@ -104,6 +107,7 @@ def create_agents(client: FoundryChatClient) -> tuple[Agent, Agent, Agent, Agent
name="return_agent",
# In a real application, an agent can have multiple tools; here we keep it simple
tools=[process_return],
require_per_service_call_history_persistence=True,
)
return triage_agent, refund_agent, order_agent, return_agent
@@ -105,6 +105,7 @@ async def main() -> None:
"When the user asks to create or generate files, hand off to code_specialist "
"by calling handoff_to_code_specialist."
),
require_per_service_call_history_persistence=True,
)
code_interpreter_tool = client.get_code_interpreter_tool()
@@ -117,6 +118,7 @@ async def main() -> None:
"and create files when requested. Always save files to /mnt/data/ directory."
),
tools=[code_interpreter_tool],
require_per_service_call_history_persistence=True,
)
workflow = (
@@ -71,6 +71,7 @@ def create_agents(client: FoundryChatClient) -> tuple[Agent, Agent, Agent]:
"if they need refund help or order tracking. Use handoff_to_refund_agent or "
"handoff_to_order_agent to transfer them."
),
require_per_service_call_history_persistence=True,
)
refund = Agent(
@@ -83,6 +84,7 @@ def create_agents(client: FoundryChatClient) -> tuple[Agent, Agent, Agent]:
"to record the request before continuing."
),
tools=[submit_refund],
require_per_service_call_history_persistence=True,
)
order = Agent(
@@ -92,6 +94,7 @@ def create_agents(client: FoundryChatClient) -> tuple[Agent, Agent, Agent]:
"You are an order tracking specialist. Help customers track their orders. "
"Ask for order numbers and provide shipping updates."
),
require_per_service_call_history_persistence=True,
)
return triage, refund, order
@@ -16,6 +16,10 @@ It includes:
The backend uses Azure OpenAI responses and supports intent-driven, non-linear handoff routing.
This demo keeps workflow state per `thread_id`. When the assistant ends a case with `Case complete.`, the UI blocks
later top-level input on that same thread and asks the user to start a new case explicitly instead of resuming a
terminated workflow.
## Folder Layout
- `backend/server.py` - FastAPI + AG-UI endpoint + Handoff workflow
@@ -81,6 +85,28 @@ VITE_BACKEND_URL=http://127.0.0.1:8891 npm run dev
7. When replacement is requested, wait for the `submit_replacement` reviewer interrupt and approve/reject it.
8. If you asked for refund-only, the flow should close without replacement/shipping prompts.
9. Confirm the case snapshot updates and workflow completion.
10. After the case closes, another top-level message on the same thread is rejected with a notice.
11. Click **Start New Case** to begin a fresh thread.
## Important: `require_per_service_call_history_persistence`
All agents participating in a handoff workflow **must** be constructed with
`require_per_service_call_history_persistence=True`. The `HandoffBuilder` will
raise a `ValueError` at build time if any participant is missing this flag.
**Why this is required:** Handoff workflows use middleware that short-circuits
tool calls via `MiddlewareTermination` when a handoff tool is invoked. Without
per-service-call history persistence, local history providers would persist tool
results that the service never received, causing call/result mismatches on
subsequent turns.
```python
agent = Agent(
client=client,
name="my_agent",
require_per_service_call_history_persistence=True, # Required for handoff
)
```
## What This Validates
@@ -17,12 +17,17 @@ import logging
import logging.handlers
import os
import random
from collections.abc import AsyncGenerator
from typing import Any
import uvicorn
from agent_framework import (
Agent,
Message,
Workflow,
WorkflowBuilder,
WorkflowContext,
executor,
tool,
)
from agent_framework.ag_ui import AgentFrameworkWorkflow, add_agent_framework_fastapi_endpoint
@@ -101,6 +106,7 @@ def create_agents() -> tuple[Agent, Agent, Agent]:
"4. If the issue is fully resolved, send a concise wrap-up that ends with exactly: Case complete."
),
client=client,
require_per_service_call_history_persistence=True,
)
refund = Agent(
@@ -126,6 +132,7 @@ def create_agents() -> tuple[Agent, Agent, Agent]:
),
client=client,
tools=[lookup_order_details, submit_refund],
require_per_service_call_history_persistence=True,
)
order = Agent(
@@ -149,19 +156,25 @@ def create_agents() -> tuple[Agent, Agent, Agent]:
),
client=client,
tools=[lookup_order_details, submit_replacement],
require_per_service_call_history_persistence=True,
)
return triage, refund, order
def is_case_complete_text(text: str) -> bool:
"""Return True when a message ends with the explicit demo completion marker."""
return text.strip().lower().endswith("case complete.")
def _termination_condition(conversation: list[Message]) -> bool:
"""Stop when any assistant emits an explicit completion marker."""
for message in reversed(conversation):
if message.role != "assistant":
continue
text = (message.text or "").strip().lower()
if text.endswith("case complete."):
if is_case_complete_text(message.text or ""):
return True
return False
@@ -215,6 +228,71 @@ def create_handoff_workflow() -> Workflow:
return builder.with_start_agent(triage).build()
def create_closed_case_notice_workflow() -> Workflow:
"""Build a tiny workflow that explains why a completed case cannot continue."""
@executor(id="closed_case_notice")
async def closed_case_notice(message: Message | None, ctx: WorkflowContext[None, str]) -> None:
del message
await ctx.yield_output(
"Your case is complete, but you're trying to do something new. Please start a new thread."
)
return WorkflowBuilder(start_executor=closed_case_notice).build()
class DemoHandoffWorkflow(AgentFrameworkWorkflow):
"""Workflow wrapper that blocks new top-level input on completed demo threads."""
def __init__(self) -> None:
super().__init__(
workflow_factory=lambda _thread_id: create_handoff_workflow(),
name="ag_ui_handoff_workflow_demo",
description="Dynamic handoff workflow demo with tool approvals and request_info resumes.",
)
self._completed_threads: set[str] = set()
self._closed_case_notice_runner = AgentFrameworkWorkflow(workflow=create_closed_case_notice_workflow())
async def run(self, input_data: dict[str, Any]) -> AsyncGenerator[Any]:
"""Intercept completed threads and return a helpful notice instead of resuming them."""
thread_id = self._thread_id_from_input(input_data)
has_messages = isinstance(input_data.get("messages"), list) and len(input_data.get("messages", [])) > 0
has_resume = input_data.get("resume") is not None
if thread_id in self._completed_threads and has_messages and not has_resume:
async for event in self._closed_case_notice_runner.run(input_data):
yield event
return
message_text_by_id: dict[str, str] = {}
case_completed_this_run = False
async for event in super().run(input_data):
event_type = getattr(event, "type", None)
if event_type == "TEXT_MESSAGE_START":
message_id = getattr(event, "message_id", None)
if isinstance(message_id, str):
message_text_by_id[message_id] = ""
elif event_type == "TEXT_MESSAGE_CONTENT":
message_id = getattr(event, "message_id", None)
delta = getattr(event, "delta", None)
if isinstance(message_id, str) and isinstance(delta, str):
message_text_by_id[message_id] = f"{message_text_by_id.get(message_id, '')}{delta}"
elif event_type == "TEXT_MESSAGE_END":
message_id = getattr(event, "message_id", None)
if isinstance(message_id, str):
final_text = message_text_by_id.pop(message_id, "")
if is_case_complete_text(final_text):
case_completed_this_run = True
yield event
if case_completed_this_run:
self._completed_threads.add(thread_id)
self.clear_thread_workflow(thread_id)
def create_app() -> FastAPI:
"""Create and configure the FastAPI application."""
@@ -231,11 +309,7 @@ def create_app() -> FastAPI:
allow_headers=["*"],
)
demo_workflow = AgentFrameworkWorkflow(
workflow_factory=lambda _thread_id: create_handoff_workflow(),
name="ag_ui_handoff_workflow_demo",
description="Dynamic handoff workflow demo with tool approvals and request_info resumes.",
)
demo_workflow = DemoHandoffWorkflow()
add_agent_framework_fastapi_endpoint(
app=app,
@@ -54,6 +54,16 @@ const STARTER_PROMPTS = [
"Help me with a damaged-order refund and replacement.",
];
const DEFAULT_CASE_SNAPSHOT: CaseSnapshot = {
orderId: "Not captured",
refundAmount: "Not captured",
refundApproved: "pending",
shippingPreference: "Not selected",
};
const CLOSED_CASE_NOTICE =
"This case is already complete. Start a new case to open a fresh thread for a new request.";
function randomId(): string {
if (typeof crypto !== "undefined" && typeof crypto.randomUUID === "function") {
return crypto.randomUUID();
@@ -213,6 +223,10 @@ function normalizeTextForDedupe(text: string): string {
return text.replace(/\s+/g, " ").trim();
}
function isCaseCompleteText(text: string): boolean {
return text.trim().toLowerCase().endsWith("case complete.");
}
function normalizeShippingPreference(text: string): string | null {
const normalized = text.trim().toLowerCase();
if (normalized.length === 0) {
@@ -263,24 +277,21 @@ export default function App(): JSX.Element {
const assistantMessageIndexRef = useRef<Record<string, number>>({});
const activeRunIdRef = useRef<string | null>(null);
const pendingUsageRef = useRef<UsageDiagnostics | null>(null);
const caseClosedRef = useRef<boolean>(false);
const [messages, setMessages] = useState<DisplayMessage[]>([]);
const [requestInfoById, setRequestInfoById] = useState<Record<string, RequestInfoPayload>>({});
const [pendingInterrupts, setPendingInterrupts] = useState<Interrupt[]>([]);
const [activeAgent, setActiveAgent] = useState<AgentId>("triage_agent");
const [visitedAgents, setVisitedAgents] = useState<Set<AgentId>>(new Set(["triage_agent"]));
const [caseSnapshot, setCaseSnapshot] = useState<CaseSnapshot>({
orderId: "Not captured",
refundAmount: "Not captured",
refundApproved: "pending",
shippingPreference: "Not selected",
});
const [caseSnapshot, setCaseSnapshot] = useState<CaseSnapshot>(DEFAULT_CASE_SNAPSHOT);
const [statusText, setStatusText] = useState<string>("Ready");
const [isRunning, setIsRunning] = useState<boolean>(false);
const [inputText, setInputText] = useState<string>("");
const [isApprovalModalOpen, setIsApprovalModalOpen] = useState<boolean>(false);
const [latestUsage, setLatestUsage] = useState<UsageDiagnostics | null>(null);
const [usageHistory, setUsageHistory] = useState<UsageDiagnostics[]>([]);
const [isCaseClosed, setIsCaseClosed] = useState<boolean>(false);
const currentInterrupt = pendingInterrupts[0];
const currentInterruptKind = currentInterrupt ? interruptKind(currentInterrupt) : "unknown";
@@ -288,6 +299,7 @@ export default function App(): JSX.Element {
const interruptPrompt = currentInterrupt
? extractPromptFromInterrupt(currentInterrupt, currentRequestInfo)
: "No pending interrupt.";
const canStartFreshCase = !currentInterrupt && isCaseClosed;
const functionCall = currentInterrupt ? extractFunctionCallFromInterrupt(currentInterrupt) : null;
const functionArguments = useMemo(() => parseFunctionArguments(functionCall), [functionCall]);
@@ -304,6 +316,34 @@ export default function App(): JSX.Element {
setMessages((prev) => [...prev, message]);
};
const pushSystemNotice = (text: string): void => {
setMessages((prev) => {
if (prev.length > 0 && prev[prev.length - 1]?.role === "system" && prev[prev.length - 1]?.text === text) {
return prev;
}
return [...prev, { id: randomId(), role: "system", text }];
});
};
const resetConversationState = (): void => {
threadIdRef.current = randomId();
assistantMessageIndexRef.current = {};
activeRunIdRef.current = null;
pendingUsageRef.current = null;
caseClosedRef.current = false;
setMessages([]);
setRequestInfoById({});
setPendingInterrupts([]);
setActiveAgent("triage_agent");
setVisitedAgents(new Set(["triage_agent"]));
setCaseSnapshot(DEFAULT_CASE_SNAPSHOT);
setStatusText("Ready");
setInputText("");
setIsApprovalModalOpen(false);
setIsCaseClosed(false);
};
const rebuildAssistantMessageIndex = (items: DisplayMessage[]): void => {
const next: Record<string, number> = {};
items.forEach((item, index) => {
@@ -364,6 +404,10 @@ export default function App(): JSX.Element {
}
const candidate = prev[index];
if (candidate.role === "user" || candidate.text.trim().length > 0) {
if (candidate.role === "assistant" && isCaseCompleteText(candidate.text)) {
caseClosedRef.current = true;
setIsCaseClosed(true);
}
return prev;
}
const next = prev.filter((item) => item.id !== messageId);
@@ -565,7 +609,9 @@ export default function App(): JSX.Element {
}
setPendingInterrupts(interruptPayload);
setStatusText(interruptPayload.length > 0 ? "Waiting for input" : "Run complete");
setStatusText(
interruptPayload.length > 0 ? "Waiting for input" : caseClosedRef.current ? "Case complete" : "Run complete"
);
setIsRunning(false);
break;
}
@@ -652,6 +698,12 @@ export default function App(): JSX.Element {
};
const startNewTurn = async (text: string): Promise<void> => {
if (caseClosedRef.current && pendingInterrupts.length === 0) {
pushSystemNotice(CLOSED_CASE_NOTICE);
setStatusText("Case complete");
return;
}
pushMessage({ id: randomId(), role: "user", text });
await runWithPayload({
@@ -873,7 +925,20 @@ export default function App(): JSX.Element {
<article className="card interrupt-card">
<h2>Pending Action</h2>
{!currentInterrupt && <p className="muted">No interrupt pending. Start with one of the prompts below.</p>}
{!currentInterrupt && (
<div className="pending-empty-state">
<p className="muted">
{isCaseClosed
? "This case is closed. New top-level messages on this thread are blocked until you start a new case."
: "No interrupt pending. Start with one of the prompts below."}
</p>
{canStartFreshCase && (
<button type="button" className="case-reset" onClick={resetConversationState} disabled={isRunning}>
Start New Case
</button>
)}
</div>
)}
{currentInterrupt && (
<div className="interrupt-body">
@@ -907,7 +972,7 @@ export default function App(): JSX.Element {
</div>
)}
{!currentInterrupt && (
{!currentInterrupt && !isCaseClosed && (
<div className="starter-prompts">
{STARTER_PROMPTS.map((prompt) => (
<button key={prompt} type="button" onClick={() => void startNewTurn(prompt)} disabled={isRunning}>
@@ -944,7 +1009,9 @@ export default function App(): JSX.Element {
? "Waiting for reviewer approval..."
: currentInterruptKind === "handoff_input"
? "Reply to continue..."
: "Describe your issue..."
: isCaseClosed
? "This case is complete. Click Start New Case to open a fresh thread..."
: "Describe your issue..."
}
disabled={isRunning || currentInterruptKind === "approval"}
/>
@@ -297,6 +297,7 @@ body {
}
.approval-actions button,
.case-reset,
.starter-prompts button,
.chat-input button {
border: 0;
@@ -307,6 +308,7 @@ body {
}
.approval-actions button:disabled,
.case-reset:disabled,
.starter-prompts button:disabled,
.chat-input button:disabled {
opacity: 0.6;
@@ -391,6 +393,19 @@ body {
gap: 10px;
}
.pending-empty-state {
display: grid;
gap: 10px;
}
.case-reset {
width: fit-content;
border: 1px solid #bdcfdc;
background: #ecf3f8;
color: #345267;
padding: 10px 14px;
}
.starter-prompts button {
text-align: left;
background: linear-gradient(125deg, #fff8ef 0%, #ffe7cf 100%);
@@ -121,6 +121,7 @@ async def run_agent_framework() -> None:
"- For technical issues: call handoff_to_technical_support"
),
description="Routes requests to appropriate specialists",
require_per_service_call_history_persistence=True,
)
# Create billing specialist
@@ -128,6 +129,7 @@ async def run_agent_framework() -> None:
name="billing_agent",
instructions="You are a billing specialist. Help with payment and billing questions. Provide clear assistance.",
description="Handles billing and payment questions",
require_per_service_call_history_persistence=True,
)
# Create technical support specialist
@@ -135,6 +137,7 @@ async def run_agent_framework() -> None:
name="technical_support",
instructions="You are technical support. Help with technical issues. Provide clear assistance.",
description="Handles technical support questions",
require_per_service_call_history_persistence=True,
)
# Create handoff workflow - simpler configuration
@@ -194,6 +194,7 @@ def _create_af_agents(client: OpenAIChatCompletionClient):
"- handoff_to_order_status_agent for shipping/timeline questions\n"
"- handoff_to_order_return_agent for returns"
),
require_per_service_call_history_persistence=True,
)
refund = Agent(
client=client,
@@ -201,6 +202,7 @@ def _create_af_agents(client: OpenAIChatCompletionClient):
instructions=(
"Handle refunds. Ask for order id and reason. If shipping info is needed, hand off to order_status_agent."
),
require_per_service_call_history_persistence=True,
)
status = Agent(
client=client,
@@ -208,6 +210,7 @@ def _create_af_agents(client: OpenAIChatCompletionClient):
instructions=(
"Provide order status, tracking, and timelines. If billing questions appear, hand off to refund_agent."
),
require_per_service_call_history_persistence=True,
)
returns = Agent(
client=client,
@@ -215,6 +218,7 @@ def _create_af_agents(client: OpenAIChatCompletionClient):
instructions=(
"Coordinate returns, confirm addresses, and summarize next steps. Hand off to triage_agent if unsure."
),
require_per_service_call_history_persistence=True,
)
return triage, refund, status, returns