Python: Fix doubled tool_call arguments in MESSAGES_SNAPSHOT when streaming (#4200)

* fix: prevent doubled tool_call arguments in MESSAGES_SNAPSHOT

When streaming with client-side tools, some providers send a full-
arguments replay after the streaming deltas complete. The `_emit_tool_call`
function unconditionally appends every arguments delta to the internal
`flow.tool_calls_by_id` tracking dictionary via `+=`. When the replay
contains the exact same complete arguments string that was already
accumulated from prior deltas, the arguments get doubled (e.g.,
`{"todoText":"buy groceries"}{"todoText":"buy groceries"}`).

This causes `MESSAGES_SNAPSHOT` events to contain invalid doubled JSON in
`tool_calls[].function.arguments`, breaking any client or middleware that
relies on snapshots for state reconstruction.

The fix adds a guard (mirroring the existing duplicate guard in
`_emit_text`) that detects when the incoming delta exactly equals the
already-accumulated arguments string, indicating a full-arguments replay
rather than an incremental delta. In this case the append is skipped,
preventing the doubling.

The `ToolCallArgsEvent` deltas are still emitted correctly for real-time
streaming — only the internal snapshot accumulator is guarded.

Fixes #4194

* fix: move duplicate check before event emission + add test

Address Copilot review feedback:
1. Move duplicate full-arguments replay detection BEFORE emitting
   ToolCallArgsEvent, for consistency with _emit_text() which returns
   early without emitting any events on replay detection.
2. Add test_emit_tool_call_skips_duplicate_full_arguments_replay() to
   verify the duplicate detection behavior for tool call arguments,
   matching the existing test pattern for text content.
This commit is contained in:
L. Elaine Dazzio
2026-02-24 04:49:24 -05:00
committed by GitHub
Unverified
parent acc49196c1
commit f78fa27215
2 changed files with 54 additions and 0 deletions
@@ -195,6 +195,23 @@ def _emit_tool_call(
delta = (
content.arguments if isinstance(content.arguments, str) else json.dumps(make_json_safe(content.arguments))
)
if tool_call_id in flow.tool_calls_by_id:
accumulated = flow.tool_calls_by_id[tool_call_id]["function"]["arguments"]
# Guard against full-argument replay: if the accumulated arguments
# already equal the incoming delta, this is a non-delta replay of
# the complete arguments string (some providers send the full
# arguments again after streaming deltas). Skip the event emission
# and accumulation to prevent doubling in MESSAGES_SNAPSHOT.
# This mirrors the early-return behaviour of _emit_text().
# (Fixes #4194)
if accumulated and delta == accumulated:
logger.debug(
"Skipping duplicate full-arguments replay for tool_call_id=%s",
tool_call_id,
)
return events
events.append(ToolCallArgsEvent(tool_call_id=tool_call_id, delta=delta))
if tool_call_id in flow.tool_calls_by_id:
@@ -6,6 +6,7 @@ import pytest
from ag_ui.core import (
TextMessageEndEvent,
TextMessageStartEvent,
ToolCallArgsEvent,
)
from agent_framework import AgentResponseUpdate, Content, Message, ResponseStream
from agent_framework.exceptions import AgentInvalidResponseException
@@ -416,6 +417,42 @@ def test_emit_tool_call_generates_id():
assert flow.tool_call_id is not None # ID should be generated
def test_emit_tool_call_skips_duplicate_full_arguments_replay():
"""Test _emit_tool_call skips replayed full-arguments on an existing tool call.
This is a regression test for issue #4194 where some streaming providers
send the full arguments string again after streaming deltas, causing the
arguments to be doubled in MESSAGES_SNAPSHOT events.
Mirrors test_emit_text_skips_duplicate_full_message_delta for consistency.
"""
flow = FlowState()
full_args = '{"city": "Seattle"}'
# Step 1: Initial tool call with name + arguments (normal start)
content_start = Content.from_function_call(
call_id="call_dup",
name="get_weather",
arguments=full_args,
)
events_start = _emit_tool_call(content_start, flow)
# Should emit ToolCallStartEvent + ToolCallArgsEvent
assert any(isinstance(e, ToolCallArgsEvent) for e in events_start)
assert flow.tool_calls_by_id["call_dup"]["function"]["arguments"] == full_args
# Step 2: Provider replays the full arguments (duplicate)
content_replay = Content(type="function_call", call_id="call_dup", arguments=full_args)
events_replay = _emit_tool_call(content_replay, flow)
# Should NOT emit any ToolCallArgsEvent (early return on replay)
args_events = [e for e in events_replay if isinstance(e, ToolCallArgsEvent)]
assert args_events == [], "Duplicate full-arguments replay should not emit ToolCallArgsEvent"
# Accumulated arguments should remain unchanged
assert flow.tool_calls_by_id["call_dup"]["function"]["arguments"] == full_args
def test_emit_tool_result_closes_open_message():
"""Test _emit_tool_result emits TextMessageEndEvent for open text message.