Python: Emit TOOL_CALL_RESULT events when resuming after tool approval (#4758)

* Emit TOOL_CALL_RESULT events on approval resume (#4589)

When a tool call is approved via the interrupt/resume flow,
_resolve_approval_responses executes the tool and injects the result
into the messages array, but no TOOL_CALL_RESULT SSE event was yielded
to the client.

Changes:
- _resolve_approval_responses now returns the list of resolved
  function_result Content objects instead of None
- run_agent_stream yields ToolCallResultEvent for each resolved
  approval result after RunStartedEvent is emitted
- Add ToolCallResultEvent to ag_ui.core imports in _agent_run.py

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Apply pre-commit auto-fixes

* fix(ag-ui): address PR review feedback for #4589

1. _resolve_approval_responses now returns only approved results (not
   rejections) so TOOL_CALL_RESULT events are emitted only for executed
   tools. Rejection results are still written into message history.

2. Emit resolved TOOL_CALL_RESULT events in the no-updates fallback
   RUN_STARTED path so approval results are never lost.

3. Rewrite tests to use real FunctionTool with func and
   approval_mode='always_require' via StubAgent default_options,
   verifying actual tool execution output in TOOL_CALL_RESULT content.
   Added test for rejection not emitting TOOL_CALL_RESULT.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix #4589: clean up approval resolution and add missing tests

- Extract duplicated TOOL_CALL_RESULT emission block into
  _make_approval_tool_result_events helper to prevent drift
- Remove dead rejection_results construction in _resolve_approval_responses;
  _replace_approval_contents_with_results already handles rejections inline
- Pass only approved_results (not all_results) to clarify the contract
- Add mixed approve/reject test validating the core splitting logic
- Add zero-updates test covering the no-updates fallback emission path
- Add direct unit test for _resolve_approval_responses return value

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Apply pre-commit auto-fixes

* Fix import sorting lint error in test_approval_result_event.py

Add blank line between first-party and third-party import groups
to satisfy ruff I001 rule.

Fixes #4589

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <copilot@github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Evan Mattson
2026-03-20 09:41:46 +09:00
committed by GitHub
Unverified
parent 4afc088f01
commit cefda44283
2 changed files with 492 additions and 17 deletions
@@ -21,6 +21,7 @@ from ag_ui.core import (
TextMessageStartEvent,
ToolCallArgsEvent,
ToolCallEndEvent,
ToolCallResultEvent,
ToolCallStartEvent,
)
from agent_framework import (
@@ -369,6 +370,24 @@ def _handle_step_based_approval(messages: list[Any]) -> list[BaseEvent]:
return events
def _make_approval_tool_result_events(resolved_approval_results: list[Content]) -> list[ToolCallResultEvent]:
"""Build TOOL_CALL_RESULT events for tools executed during approval resolution."""
events: list[ToolCallResultEvent] = []
for resolved in resolved_approval_results:
if resolved.call_id:
raw = resolved.result if resolved.result is not None else ""
result_str = raw if isinstance(raw, str) else json.dumps(make_json_safe(raw))
events.append(
ToolCallResultEvent(
message_id=generate_event_id(),
tool_call_id=resolved.call_id,
content=result_str,
role="tool",
)
)
return events
def _evict_oldest_approvals(registry: dict[str, str], max_size: int = 10_000) -> None:
"""Evict the oldest entries from the pending-approvals registry (LRU).
@@ -391,7 +410,7 @@ async def _resolve_approval_responses(
run_kwargs: dict[str, Any],
pending_approvals: dict[str, str] | None = None,
thread_id: str = "",
) -> None:
) -> list[Content]:
"""Execute approved function calls and replace approval content with results.
This modifies the messages list in place, replacing function_approval_response
@@ -407,10 +426,16 @@ async def _resolve_approval_responses(
When provided, every approval response is validated against this
registry to prevent bypass, function name spoofing, and replay.
thread_id: The conversation thread ID used to scope registry keys.
Returns:
List of approved function_result Content objects only (empty if no
approvals). Rejection results are written into the message history
but are *not* included in the return value because they should not
be emitted as TOOL_CALL_RESULT events.
"""
fcc_todo = _collect_approval_responses(messages)
if not fcc_todo:
return
return []
approved_responses = [resp for resp in fcc_todo.values() if resp.approved]
rejected_responses = [resp for resp in fcc_todo.values() if not resp.approved]
@@ -493,31 +518,23 @@ async def _resolve_approval_responses(
logger.exception("Failed to execute approved tool calls; injecting error results: %s", e)
approved_function_results = []
# Build normalized results for approved responses
normalized_results: list[Content] = []
# Build results for approved responses (used for TOOL_CALL_RESULT event emission)
approved_results: list[Content] = []
for idx, approval in enumerate(approved_responses):
if (
idx < len(approved_function_results)
and getattr(approved_function_results[idx], "type", None) == "function_result"
):
normalized_results.append(approved_function_results[idx])
approved_results.append(approved_function_results[idx])
continue
# Get call_id from function_call if present, otherwise use approval.id
func_call = approval.function_call
call_id = (func_call.call_id if func_call else None) or approval.id or ""
normalized_results.append(
approved_results.append(
Content.from_function_result(call_id=call_id, result="Error: Tool call invocation failed.")
)
# Build rejection results
for rejection in rejected_responses:
func_call = rejection.function_call
call_id = (func_call.call_id if func_call else None) or rejection.id or ""
normalized_results.append(
Content.from_function_result(call_id=call_id, result="Error: Tool call invocation was rejected by user.")
)
_replace_approval_contents_with_results(messages, fcc_todo, normalized_results) # type: ignore
_replace_approval_contents_with_results(messages, fcc_todo, approved_results) # type: ignore
# Post-process: Convert user messages with function_result content to proper tool messages.
# After _replace_approval_contents_with_results, approved tool calls have their results
@@ -525,6 +542,8 @@ async def _resolve_approval_responses(
# This transformation ensures the message history is valid for the LLM provider.
_convert_approval_results_to_tool_messages(messages)
return approved_results
def _convert_approval_results_to_tool_messages(messages: list[Message]) -> None:
"""Convert function_result content in user messages to proper tool messages.
@@ -787,7 +806,9 @@ async def run_agent_stream(
# Resolve approval responses (execute approved tools, replace approvals with results)
# This must happen before running the agent so it sees the tool results
tools_for_execution = tools if tools is not None else server_tools
await _resolve_approval_responses(messages, tools_for_execution, agent, run_kwargs, pending_approvals, thread_id)
resolved_approval_results = await _resolve_approval_responses(
messages, tools_for_execution, agent, run_kwargs, pending_approvals, thread_id
)
# Defense-in-depth: replace approval payloads in snapshot with actual tool results
# so CopilotKit does not re-send stale approval content on subsequent turns.
@@ -851,6 +872,9 @@ async def run_agent_stream(
yield StateSnapshotEvent(snapshot=flow.current_state)
run_started_emitted = True
for event in _make_approval_tool_result_events(resolved_approval_results):
yield event
# Feature #4: Detect tool-only messages (no text content)
# Emit TextMessageStartEvent to create message context for tool calls
if not flow.message_id and _has_only_tool_calls(update.contents):
@@ -905,7 +929,8 @@ async def run_agent_stream(
if state_schema and flow.current_state:
yield StateSnapshotEvent(snapshot=flow.current_state)
# Process structured output if response_format is set
for event in _make_approval_tool_result_events(resolved_approval_results):
yield event
if response_format is not None and all_updates:
from agent_framework import AgentResponse
from pydantic import BaseModel
@@ -0,0 +1,450 @@
# Copyright (c) Microsoft. All rights reserved.
"""Tests for TOOL_CALL_RESULT event emission on approval resume flows."""
from __future__ import annotations
import json
from typing import Any
from agent_framework import AgentResponseUpdate, Content, FunctionTool
from conftest import StubAgent
from agent_framework_ag_ui._agent import AgentConfig
from agent_framework_ag_ui._agent_run import run_agent_stream
def _make_weather_tool() -> FunctionTool:
"""Create a real executable weather tool with approval_mode='always_require'."""
def get_weather(city: str) -> str:
return f"Sunny in {city}"
return FunctionTool(
name="get_weather",
description="Get the weather for a city",
func=get_weather,
approval_mode="always_require",
)
async def test_approval_resume_emits_tool_call_result() -> None:
"""After approving a tool call, the resume stream should contain a TOOL_CALL_RESULT event.
The message format follows the AG-UI approval pattern:
- assistant message with tool_calls
- tool message with {"accepted": true} content and toolCallId
"""
tool_name = "get_weather"
call_id = "call_abc123"
weather_tool = _make_weather_tool()
agent = StubAgent(
updates=[AgentResponseUpdate(contents=[Content.from_text(text="The weather is sunny.")], role="assistant")],
default_options={"tools": [weather_tool]},
)
config = AgentConfig()
# Build resume messages: user query, assistant tool call, approval response
resume_messages: list[dict[str, Any]] = [
{"role": "user", "content": "What's the weather in Seattle?"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": call_id,
"type": "function",
"function": {
"name": tool_name,
"arguments": json.dumps({"city": "Seattle"}),
},
}
],
},
{
"role": "tool",
"content": json.dumps({"accepted": True}),
"toolCallId": call_id,
},
]
input_data: dict[str, Any] = {
"thread_id": "thread-approval-result",
"run_id": "run-resume",
"messages": resume_messages,
}
events: list[Any] = []
async for event in run_agent_stream(input_data, agent, config):
events.append(event)
event_types = [getattr(e, "type", None) for e in events]
assert "RUN_STARTED" in event_types, f"Expected RUN_STARTED, got types: {event_types}"
assert "RUN_FINISHED" in event_types, f"Expected RUN_FINISHED, got types: {event_types}"
# TOOL_CALL_RESULT must be present for the approved tool
tool_result_events = [e for e in events if getattr(e, "type", None) == "TOOL_CALL_RESULT"]
assert len(tool_result_events) > 0, (
f"Expected at least one TOOL_CALL_RESULT event for the approved tool, "
f"but found none. Event types in stream: {event_types}"
)
result_event = tool_result_events[0]
assert result_event.tool_call_id == call_id, (
f"Expected TOOL_CALL_RESULT with tool_call_id={call_id}, got tool_call_id={result_event.tool_call_id}"
)
# Verify the result contains the actual tool execution output
assert result_event.content == "Sunny in Seattle"
async def test_approval_resume_result_has_content() -> None:
"""TOOL_CALL_RESULT event from an approved tool should contain the execution result."""
tool_name = "get_weather"
call_id = "call_content_check"
weather_tool = _make_weather_tool()
agent = StubAgent(
updates=[AgentResponseUpdate(contents=[Content.from_text(text="Done.")], role="assistant")],
default_options={"tools": [weather_tool]},
)
config = AgentConfig()
resume_messages: list[dict[str, Any]] = [
{"role": "user", "content": "Check the weather"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": call_id,
"type": "function",
"function": {
"name": tool_name,
"arguments": json.dumps({"city": "Portland"}),
},
}
],
},
{
"role": "tool",
"content": json.dumps({"accepted": True}),
"toolCallId": call_id,
},
]
input_data: dict[str, Any] = {
"thread_id": "thread-result-content",
"run_id": "run-resume-2",
"messages": resume_messages,
}
events: list[Any] = []
async for event in run_agent_stream(input_data, agent, config):
events.append(event)
tool_result_events = [e for e in events if getattr(e, "type", None) == "TOOL_CALL_RESULT"]
assert len(tool_result_events) == 1
result_event = tool_result_events[0]
assert result_event.tool_call_id == call_id
assert result_event.role == "tool"
# Verify the result contains the actual tool execution output (string returned directly)
assert result_event.content == "Sunny in Portland"
async def test_no_approval_no_extra_tool_result() -> None:
"""When no approval response is present, no extra TOOL_CALL_RESULT events should be emitted."""
agent = StubAgent(updates=[AgentResponseUpdate(contents=[Content.from_text(text="Hello.")], role="assistant")])
config = AgentConfig()
input_data: dict[str, Any] = {
"thread_id": "thread-no-approval",
"run_id": "run-normal",
"messages": [{"role": "user", "content": "Hi"}],
}
events: list[Any] = []
async for event in run_agent_stream(input_data, agent, config):
events.append(event)
tool_result_events = [e for e in events if getattr(e, "type", None) == "TOOL_CALL_RESULT"]
assert len(tool_result_events) == 0, f"Unexpected TOOL_CALL_RESULT events: {tool_result_events}"
async def test_rejection_does_not_emit_tool_call_result() -> None:
"""Rejected tool calls should not produce TOOL_CALL_RESULT events."""
tool_name = "get_weather"
call_id = "call_rejected"
weather_tool = _make_weather_tool()
agent = StubAgent(
updates=[AgentResponseUpdate(contents=[Content.from_text(text="OK, I won't check.")], role="assistant")],
default_options={"tools": [weather_tool]},
)
config = AgentConfig()
resume_messages: list[dict[str, Any]] = [
{"role": "user", "content": "What's the weather?"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": call_id,
"type": "function",
"function": {
"name": tool_name,
"arguments": json.dumps({"city": "Denver"}),
},
}
],
},
{
"role": "tool",
"content": json.dumps({"accepted": False}),
"toolCallId": call_id,
},
]
input_data: dict[str, Any] = {
"thread_id": "thread-rejection",
"run_id": "run-rejected",
"messages": resume_messages,
}
events: list[Any] = []
async for event in run_agent_stream(input_data, agent, config):
events.append(event)
tool_result_events = [e for e in events if getattr(e, "type", None) == "TOOL_CALL_RESULT"]
assert len(tool_result_events) == 0, (
f"Expected no TOOL_CALL_RESULT for rejected tool, got {len(tool_result_events)}"
)
def _make_temperature_tool() -> FunctionTool:
"""Create a real executable temperature tool with approval_mode='always_require'."""
def get_temperature(city: str) -> str:
return f"72F in {city}"
return FunctionTool(
name="get_temperature",
description="Get the temperature for a city",
func=get_temperature,
approval_mode="always_require",
)
async def test_mixed_approve_reject_emits_only_approved_tool_result() -> None:
"""When one tool call is approved and another rejected, only the approved one produces a TOOL_CALL_RESULT event."""
weather_tool = _make_weather_tool()
temperature_tool = _make_temperature_tool()
approved_call_id = "call_approved"
rejected_call_id = "call_rejected"
agent = StubAgent(
updates=[AgentResponseUpdate(contents=[Content.from_text(text="Here are the results.")], role="assistant")],
default_options={"tools": [weather_tool, temperature_tool]},
)
config = AgentConfig()
resume_messages: list[dict[str, Any]] = [
{"role": "user", "content": "Weather and temperature in Seattle?"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": approved_call_id,
"type": "function",
"function": {
"name": "get_weather",
"arguments": json.dumps({"city": "Seattle"}),
},
},
{
"id": rejected_call_id,
"type": "function",
"function": {
"name": "get_temperature",
"arguments": json.dumps({"city": "Seattle"}),
},
},
],
},
{
"role": "tool",
"content": json.dumps({"accepted": True}),
"toolCallId": approved_call_id,
},
{
"role": "tool",
"content": json.dumps({"accepted": False}),
"toolCallId": rejected_call_id,
},
]
input_data: dict[str, Any] = {
"thread_id": "thread-mixed",
"run_id": "run-mixed",
"messages": resume_messages,
}
events: list[Any] = []
async for event in run_agent_stream(input_data, agent, config):
events.append(event)
tool_result_events = [e for e in events if getattr(e, "type", None) == "TOOL_CALL_RESULT"]
# Only the approved tool call should produce a TOOL_CALL_RESULT event
assert len(tool_result_events) == 1, (
f"Expected exactly 1 TOOL_CALL_RESULT (approved only), got {len(tool_result_events)}"
)
assert tool_result_events[0].tool_call_id == approved_call_id
assert tool_result_events[0].content == "Sunny in Seattle"
async def test_approval_resume_zero_updates_emits_tool_result() -> None:
"""When the agent produces zero updates, TOOL_CALL_RESULT events should still be emitted via the fallback path."""
tool_name = "get_weather"
call_id = "call_zero_updates"
weather_tool = _make_weather_tool()
agent = StubAgent(
updates=[],
default_options={"tools": [weather_tool]},
)
config = AgentConfig()
resume_messages: list[dict[str, Any]] = [
{"role": "user", "content": "What's the weather?"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": call_id,
"type": "function",
"function": {
"name": tool_name,
"arguments": json.dumps({"city": "Boston"}),
},
}
],
},
{
"role": "tool",
"content": json.dumps({"accepted": True}),
"toolCallId": call_id,
},
]
input_data: dict[str, Any] = {
"thread_id": "thread-zero-updates",
"run_id": "run-zero-updates",
"messages": resume_messages,
}
events: list[Any] = []
async for event in run_agent_stream(input_data, agent, config):
events.append(event)
event_types = [getattr(e, "type", None) for e in events]
assert "RUN_STARTED" in event_types
tool_result_events = [e for e in events if getattr(e, "type", None) == "TOOL_CALL_RESULT"]
assert len(tool_result_events) == 1, (
f"Expected 1 TOOL_CALL_RESULT in zero-updates fallback path, got {len(tool_result_events)}"
)
assert tool_result_events[0].tool_call_id == call_id
assert tool_result_events[0].content == "Sunny in Boston"
async def test_resolve_approval_responses_returns_only_approved() -> None:
"""_resolve_approval_responses should return only approved results; rejection results go into messages only."""
from agent_framework import Message
from agent_framework_ag_ui._agent_run import _resolve_approval_responses
weather_tool = _make_weather_tool()
temperature_tool = _make_temperature_tool()
approved_call_id = "call_a"
rejected_call_id = "call_r"
messages: list[Any] = [
Message(role="user", contents=[Content.from_text(text="Hi")]),
Message(
role="assistant",
contents=[
Content(
type="function_approval_request",
id=approved_call_id,
function_call=Content(
type="function_call",
name="get_weather",
call_id=approved_call_id,
arguments='{"city": "NYC"}',
),
),
Content(
type="function_approval_request",
id=rejected_call_id,
function_call=Content(
type="function_call",
name="get_temperature",
call_id=rejected_call_id,
arguments='{"city": "NYC"}',
),
),
],
),
Message(
role="user",
contents=[
Content(
type="function_approval_response",
id=approved_call_id,
approved=True,
function_call=Content(
type="function_call",
name="get_weather",
call_id=approved_call_id,
arguments='{"city": "NYC"}',
),
),
Content(
type="function_approval_response",
id=rejected_call_id,
approved=False,
function_call=Content(
type="function_call",
name="get_temperature",
call_id=rejected_call_id,
arguments='{"city": "NYC"}',
),
),
],
),
]
agent = StubAgent(
updates=[],
default_options={"tools": [weather_tool, temperature_tool]},
)
results = await _resolve_approval_responses(messages, [weather_tool, temperature_tool], agent, {})
# Return value should only contain approved results
assert len(results) == 1
assert results[0].call_id == approved_call_id
assert results[0].type == "function_result"
# Rejection result should be written into messages (by _replace_approval_contents_with_results)
all_contents = [c for msg in messages for c in msg.contents]
rejection_results = [c for c in all_contents if c.type == "function_result" and c.call_id == rejected_call_id]
assert len(rejection_results) == 1
assert "rejected" in str(rejection_results[0].result).lower()