Python: Stop emitting duplicate reasoning content from OpenAI response.reasoning_text.done and response.reasoning_summary_text.done events (#5162)

* Fix reasoning text done events duplicating streamed delta content (#5157)

The OpenAI Responses API sends both reasoning_text.delta (incremental
chunks) and reasoning_text.done (full accumulated text) events. The
chat client was emitting Content for both, causing ag-ui to append the
full done text onto already-accumulated delta text, producing
duplicated reasoning output.

Stop emitting Content for reasoning_text.done and
reasoning_summary_text.done events, matching how output_text.done is
already handled (not emitted). The deltas contain all the content;
the done event is redundant.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix(openai): emit reasoning done content as fallback when no deltas observed (#5157)

Address PR review feedback:
- Track item_ids that received reasoning deltas via seen_reasoning_delta_item_ids set
- Emit content from done events only when no deltas were received for the
  item_id, preventing silent content loss on stream resumption
- Add comment documenting code_interpreter done event asymmetry
- Replace redundant ag-ui test with deduplication-focused test
- Add integration test for delta+done sequence in OpenAI chat client tests
- Add fallback path tests for done events without preceding deltas

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address review feedback for #5157: Python: [Bug]: "type": "response.reasoning_text.delta" and "response.reasoning_text.done" both get exposed as "text_reasoning"

* Fix AG-UI reasoning streaming to use proper Start/End pattern (#5157)

_emit_text_reasoning now follows the same streaming pattern as _emit_text:
- Emits ReasoningStartEvent/ReasoningMessageStartEvent only on the first
  delta for a given message_id
- Emits only ReasoningMessageContentEvent for subsequent deltas
- Defers ReasoningMessageEndEvent/ReasoningEndEvent until
  _close_reasoning_block is called (on content type switch or end-of-run)

This produces the correct protocol pattern:
  ReasoningStartEvent
    ReasoningMessageStartEvent
    ReasoningMessageContentEvent(delta1)
    ReasoningMessageContentEvent(delta2)
    ReasoningMessageEndEvent
  ReasoningEndEvent

Instead of wrapping every delta in a full Start→End sequence.

Backward compatibility is preserved: calling _emit_text_reasoning without
a flow argument still produces the full sequence per call.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix import ordering lint error in AG-UI test file (#5157)

Move inline import of TextMessageContentEvent to the top-level import
block and ensure alphabetical ordering to satisfy ruff I001 rule.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix mypy error: rename loop variable to avoid type conflict with WorkflowEvent

The 'event' variable was already typed as WorkflowEvent[Any] from the
async for loop at line 590. Reusing it in the _close_reasoning_block
loop (which returns list[BaseEvent]) caused an incompatible assignment
error. Renamed to 'reasoning_evt' to avoid the conflict.

Fixes #5162

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address review feedback for #5157: review comment fixes

* narrow test result reporting to explicit pytest JUnit XML

* Fix test args

* Fix pytest-results-action in merge workflow and remove committed test artifacts

Apply the same JUnit XML fix from python-tests.yml to python-merge-tests.yml:
add --junitxml=pytest.xml to all test commands and narrow the results action
path from ./python/**.xml to ./python/pytest.xml. Also remove accidentally
committed pytest.xml and python-coverage.xml and add them to .gitignore.

---------

Co-authored-by: Copilot <copilot@github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Evan Mattson
2026-04-10 07:44:59 +09:00
committed by GitHub
Unverified
parent 1dd828d255
commit 5e8fe0be1f
15 changed files with 412 additions and 89 deletions
+14 -8
View File
@@ -115,12 +115,13 @@ jobs:
-m "not integration"
--timeout=120 --session-timeout=900 --timeout_method thread
--retries 2 --retry-delay 5
--junitxml=pytest.xml
working-directory: ./python
- name: Surface failing tests
if: always()
uses: pmeier/pytest-results-action@v0.7.2
with:
path: ./python/**.xml
path: ./python/pytest.xml
summary: true
display-options: fEX
fail-on-empty: false
@@ -163,6 +164,7 @@ jobs:
-n logical --dist worksteal
--timeout=120 --session-timeout=900 --timeout_method thread
--retries 2 --retry-delay 5
--junitxml=pytest.xml
working-directory: ./python
- name: Test OpenAI samples
timeout-minutes: 10
@@ -173,7 +175,7 @@ jobs:
if: always()
uses: pmeier/pytest-results-action@v0.7.2
with:
path: ./python/**.xml
path: ./python/pytest.xml
summary: true
display-options: fEX
fail-on-empty: false
@@ -225,6 +227,7 @@ jobs:
-n logical --dist worksteal
--timeout=120 --session-timeout=900 --timeout_method thread
--retries 2 --retry-delay 5
--junitxml=pytest.xml
working-directory: ./python
- name: Test Azure samples
timeout-minutes: 10
@@ -235,7 +238,7 @@ jobs:
if: always()
uses: pmeier/pytest-results-action@v0.7.2
with:
path: ./python/**.xml
path: ./python/pytest.xml
summary: true
display-options: fEX
fail-on-empty: false
@@ -285,6 +288,7 @@ jobs:
-n logical --dist worksteal
--timeout=120 --session-timeout=900 --timeout_method thread
--retries 2 --retry-delay 5
--junitxml=pytest.xml
working-directory: ./python
- name: Stop local MCP server
if: always()
@@ -310,7 +314,7 @@ jobs:
if: always()
uses: pmeier/pytest-results-action@v0.7.2
with:
path: ./python/**.xml
path: ./python/pytest.xml
summary: true
display-options: fEX
fail-on-empty: false
@@ -375,12 +379,13 @@ jobs:
-x
--timeout=360 --session-timeout=900 --timeout_method thread
--retries 2 --retry-delay 5
--junitxml=pytest.xml
working-directory: ./python
- name: Surface failing tests
if: always()
uses: pmeier/pytest-results-action@v0.7.2
with:
path: ./python/**.xml
path: ./python/pytest.xml
summary: true
display-options: fEX
fail-on-empty: false
@@ -430,12 +435,13 @@ jobs:
-n logical --dist worksteal
--timeout=120 --session-timeout=900 --timeout_method thread
--retries 2 --retry-delay 5
--junitxml=pytest.xml
working-directory: ./python
- name: Surface failing tests
if: always()
uses: pmeier/pytest-results-action@v0.7.2
with:
path: ./python/**.xml
path: ./python/pytest.xml
summary: true
display-options: fEX
fail-on-empty: false
@@ -489,13 +495,13 @@ jobs:
echo "Cosmos DB emulator did not become ready in time." >&2
exit 1
- name: Test with pytest (Cosmos integration)
run: uv run --directory packages/azure-cosmos poe integration-tests -n logical --dist worksteal --timeout=120 --session-timeout=900 --timeout_method thread --retries 2 --retry-delay 5
run: uv run --directory packages/azure-cosmos poe integration-tests -n logical --dist worksteal --timeout=120 --session-timeout=900 --timeout_method thread --retries 2 --retry-delay 5 --junitxml=pytest.xml
working-directory: ./python
- name: Surface failing tests
if: always()
uses: pmeier/pytest-results-action@v0.7.2
with:
path: ./python/**.xml
path: ./python/pytest.xml
summary: true
display-options: fEX
fail-on-empty: false
+2 -2
View File
@@ -40,7 +40,7 @@ jobs:
UV_CACHE_DIR: /tmp/.uv-cache
# Unit tests
- name: Run all tests
run: uv run poe test -A
run: uv run poe test -A --junitxml=pytest.xml
working-directory: ./python
# Surface failing tests
@@ -48,7 +48,7 @@ jobs:
if: always()
uses: pmeier/pytest-results-action@v0.7.2
with:
path: ./python/**.xml
path: ./python/pytest.xml
summary: true
display-options: fEX
fail-on-empty: false
+2
View File
@@ -47,6 +47,8 @@ htmlcov/
.cache
nosetests.xml
coverage.xml
pytest.xml
python-coverage.xml
*.cover
*.py,cover
.hypothesis/
@@ -46,6 +46,7 @@ from ._orchestration._tooling import collect_server_tools, merge_tools, register
from ._run_common import (
FlowState,
_build_run_finished_event, # type: ignore
_close_reasoning_block, # type: ignore
_emit_content, # type: ignore
_extract_resume_payload, # type: ignore
_has_only_tool_calls, # type: ignore
@@ -1058,6 +1059,10 @@ async def run_agent_stream(
}
)
# Close any open reasoning block
for event in _close_reasoning_block(flow):
yield event
# Close any open message
if flow.message_id:
logger.debug(f"End of run: closing text message message_id={flow.message_id}")
@@ -128,6 +128,7 @@ class FlowState:
interrupts: list[dict[str, Any]] = field(default_factory=list) # pyright: ignore[reportUnknownVariableType]
reasoning_messages: list[dict[str, Any]] = field(default_factory=list) # pyright: ignore[reportUnknownVariableType]
accumulated_reasoning: dict[str, str] = field(default_factory=dict) # pyright: ignore[reportUnknownVariableType]
reasoning_message_id: str | None = None
def get_tool_name(self, call_id: str | None) -> str | None:
"""Get tool name by call ID."""
@@ -462,12 +463,39 @@ def _emit_mcp_tool_result(
return _emit_tool_result_common(content.call_id, raw_output, flow, predictive_handler)
def _close_reasoning_block(flow: FlowState) -> list[BaseEvent]:
"""Close an open reasoning block, emitting end events.
Should be called when the reasoning block is complete -- e.g. when
non-reasoning content arrives or at end of a run.
"""
if not flow.reasoning_message_id:
return []
message_id = flow.reasoning_message_id
flow.reasoning_message_id = None
return [
ReasoningMessageEndEvent(message_id=message_id),
ReasoningEndEvent(message_id=message_id),
]
def _emit_text_reasoning(content: Content, flow: FlowState | None = None) -> list[BaseEvent]:
"""Emit AG-UI reasoning events for text_reasoning content.
Uses the protocol-defined reasoning event types so that AG-UI consumers
such as CopilotKit can render reasoning natively.
When *flow* is provided the function follows the streaming pattern: it
emits ``ReasoningStartEvent`` / ``ReasoningMessageStartEvent`` only on
the first delta for a given ``message_id`` and just
``ReasoningMessageContentEvent`` for subsequent deltas. The matching
``ReasoningMessageEndEvent`` / ``ReasoningEndEvent`` are deferred until
``_close_reasoning_block`` is called (e.g. when non-reasoning content
arrives or at end-of-run).
Without *flow* (backward-compat) the full Start→Content→End sequence is
emitted for every call.
Only ``content.text`` is used for the visible reasoning message. If
``content.protected_data`` is present it is emitted as a
``ReasoningEncryptedValueEvent`` so that consumers can persist encrypted
@@ -483,26 +511,49 @@ def _emit_text_reasoning(content: Content, flow: FlowState | None = None) -> lis
message_id = content.id or generate_event_id()
events: list[BaseEvent] = [
ReasoningStartEvent(message_id=message_id),
ReasoningMessageStartEvent(message_id=message_id, role="assistant"),
]
events: list[BaseEvent] = []
if text:
events.append(ReasoningMessageContentEvent(message_id=message_id, delta=text))
if flow is not None:
# Streaming mode: track open reasoning block in flow state.
if flow.reasoning_message_id != message_id:
# Close any previously open reasoning block (different message_id).
events.extend(_close_reasoning_block(flow))
# Open new reasoning block.
events.append(ReasoningStartEvent(message_id=message_id))
events.append(ReasoningMessageStartEvent(message_id=message_id, role="assistant"))
flow.reasoning_message_id = message_id
events.append(ReasoningMessageEndEvent(message_id=message_id))
if text:
events.append(ReasoningMessageContentEvent(message_id=message_id, delta=text))
if content.protected_data is not None:
events.append(
ReasoningEncryptedValueEvent(
subtype="message",
entity_id=message_id,
encrypted_value=content.protected_data,
if content.protected_data is not None:
events.append(
ReasoningEncryptedValueEvent(
subtype="message",
entity_id=message_id,
encrypted_value=content.protected_data,
)
)
)
else:
# No flow -- backward-compatible full sequence per call.
events.append(ReasoningStartEvent(message_id=message_id))
events.append(ReasoningMessageStartEvent(message_id=message_id, role="assistant"))
events.append(ReasoningEndEvent(message_id=message_id))
if text:
events.append(ReasoningMessageContentEvent(message_id=message_id, delta=text))
events.append(ReasoningMessageEndEvent(message_id=message_id))
if content.protected_data is not None:
events.append(
ReasoningEncryptedValueEvent(
subtype="message",
entity_id=message_id,
encrypted_value=content.protected_data,
)
)
events.append(ReasoningEndEvent(message_id=message_id))
# Persist reasoning into flow state for MESSAGES_SNAPSHOT.
# Accumulate reasoning text per message_id, similar to flow.accumulated_text,
@@ -546,23 +597,30 @@ def _emit_content(
) -> list[BaseEvent]:
"""Emit appropriate events for any content type."""
content_type = getattr(content, "type", None)
# Close open reasoning block when switching to non-reasoning content.
if content_type != "text_reasoning":
events = _close_reasoning_block(flow)
else:
events = []
if content_type == "text":
return _emit_text(content, flow, skip_text)
return events + _emit_text(content, flow, skip_text)
if content_type == "function_call":
return _emit_tool_call(content, flow, predictive_handler)
return events + _emit_tool_call(content, flow, predictive_handler)
if content_type == "function_result":
return _emit_tool_result(content, flow, predictive_handler)
return events + _emit_tool_result(content, flow, predictive_handler)
if content_type == "function_approval_request":
return _emit_approval_request(content, flow, predictive_handler, require_confirmation)
return events + _emit_approval_request(content, flow, predictive_handler, require_confirmation)
if content_type == "usage":
return _emit_usage(content)
return events + _emit_usage(content)
if content_type == "oauth_consent_request":
return _emit_oauth_consent(content)
return events + _emit_oauth_consent(content)
if content_type == "mcp_server_tool_call":
return _emit_mcp_tool_call(content, flow)
return events + _emit_mcp_tool_call(content, flow)
if content_type == "mcp_server_tool_result":
return _emit_mcp_tool_result(content, flow, predictive_handler)
return events + _emit_mcp_tool_result(content, flow, predictive_handler)
if content_type == "text_reasoning":
return _emit_text_reasoning(content, flow)
logger.debug("Skipping unsupported content type in AG-UI emitter: %s", content_type)
return []
return events
@@ -29,6 +29,7 @@ from ._message_adapters import normalize_agui_input_messages
from ._run_common import (
FlowState,
_build_run_finished_event,
_close_reasoning_block,
_emit_content,
_extract_resume_payload,
_normalize_resume_interrupts,
@@ -729,6 +730,9 @@ async def run_workflow_stream(
run_error_emitted = True
terminal_emitted = True
for reasoning_evt in _close_reasoning_block(flow):
yield reasoning_evt
for end_event in _drain_open_message():
yield end_event
+140 -1
View File
@@ -11,6 +11,7 @@ from ag_ui.core import (
ReasoningMessageEndEvent,
ReasoningMessageStartEvent,
ReasoningStartEvent,
TextMessageContentEvent,
TextMessageEndEvent,
TextMessageStartEvent,
ToolCallArgsEvent,
@@ -29,6 +30,7 @@ from agent_framework_ag_ui._agent_run import (
from agent_framework_ag_ui._run_common import (
FlowState,
_build_run_finished_event,
_close_reasoning_block,
_emit_approval_request,
_emit_content,
_emit_mcp_tool_call,
@@ -1344,8 +1346,11 @@ class TestEmitContentMcpRouting:
events = _emit_content(content, flow)
assert len(events) == 5
# Streaming pattern: Start + MessageStart + Content (no End events yet)
assert len(events) == 3
assert isinstance(events[0], ReasoningStartEvent)
assert isinstance(events[1], ReasoningMessageStartEvent)
assert isinstance(events[2], ReasoningMessageContentEvent)
class TestReasoningInSnapshot:
@@ -1501,3 +1506,137 @@ class TestReasoningInSnapshot:
assert len(flow.reasoning_messages) == 1
assert flow.reasoning_messages[0]["content"] == "part1 part2"
assert flow.reasoning_messages[0]["encryptedValue"] == "encrypted-payload"
def test_reasoning_done_after_deltas_does_not_duplicate(self):
"""A done-style content arriving after deltas does not duplicate accumulated text.
The upstream client should skip done events when deltas preceded them,
but if one leaks through, the accumulator must not double-append.
This test verifies that only the delta-produced text is stored.
"""
flow = FlowState()
msg_id = "reason_dedup"
delta1 = Content.from_text_reasoning(id=msg_id, text="Hello ")
delta2 = Content.from_text_reasoning(id=msg_id, text="world")
_emit_text_reasoning(delta1, flow)
_emit_text_reasoning(delta2, flow)
# Accumulated text should equal the concatenation of deltas only
assert len(flow.reasoning_messages) == 1
assert flow.reasoning_messages[0]["content"] == "Hello world"
assert flow.reasoning_messages[0]["id"] == msg_id
def test_reasoning_deltas_emit_one_content_event_each(self):
"""Each reasoning delta emits exactly one ReasoningMessageContentEvent
within a single Start/End sequence (streaming pattern)."""
flow = FlowState()
msg_id = "reason_evt"
delta1 = Content.from_text_reasoning(id=msg_id, text="Think ")
delta2 = Content.from_text_reasoning(id=msg_id, text="hard")
events1 = _emit_text_reasoning(delta1, flow)
events2 = _emit_text_reasoning(delta2, flow)
close_events = _close_reasoning_block(flow)
all_events = events1 + events2 + close_events
content_events = [e for e in all_events if isinstance(e, ReasoningMessageContentEvent)]
assert len(content_events) == 2
assert content_events[0].delta == "Think "
assert content_events[1].delta == "hard"
# Streaming pattern: one Start/End sequence wrapping both content events
start_events = [e for e in all_events if isinstance(e, ReasoningStartEvent)]
end_events = [e for e in all_events if isinstance(e, ReasoningEndEvent)]
msg_start_events = [e for e in all_events if isinstance(e, ReasoningMessageStartEvent)]
msg_end_events = [e for e in all_events if isinstance(e, ReasoningMessageEndEvent)]
assert len(start_events) == 1
assert len(end_events) == 1
assert len(msg_start_events) == 1
assert len(msg_end_events) == 1
def test_reasoning_streaming_event_order(self):
"""Streaming reasoning emits Start once, then Content per delta, then End on close."""
flow = FlowState()
msg_id = "reason_order"
d1 = Content.from_text_reasoning(id=msg_id, text="A ")
d2 = Content.from_text_reasoning(id=msg_id, text="B ")
d3 = Content.from_text_reasoning(id=msg_id, text="C")
events = []
events.extend(_emit_text_reasoning(d1, flow))
events.extend(_emit_text_reasoning(d2, flow))
events.extend(_emit_text_reasoning(d3, flow))
events.extend(_close_reasoning_block(flow))
assert isinstance(events[0], ReasoningStartEvent)
assert isinstance(events[1], ReasoningMessageStartEvent)
assert isinstance(events[2], ReasoningMessageContentEvent)
assert events[2].delta == "A "
assert isinstance(events[3], ReasoningMessageContentEvent)
assert events[3].delta == "B "
assert isinstance(events[4], ReasoningMessageContentEvent)
assert events[4].delta == "C"
assert isinstance(events[5], ReasoningMessageEndEvent)
assert isinstance(events[6], ReasoningEndEvent)
assert len(events) == 7
def test_close_reasoning_block_noop_when_not_open(self):
"""_close_reasoning_block returns empty list when no reasoning block is open."""
flow = FlowState()
assert _close_reasoning_block(flow) == []
def test_close_reasoning_block_resets_state(self):
"""_close_reasoning_block clears reasoning_message_id."""
flow = FlowState()
_emit_text_reasoning(Content.from_text_reasoning(id="r1", text="x"), flow)
assert flow.reasoning_message_id == "r1"
_close_reasoning_block(flow)
assert flow.reasoning_message_id is None
def test_emit_content_closes_reasoning_on_text(self):
"""Switching from reasoning to text content auto-closes reasoning block."""
flow = FlowState()
reasoning = Content.from_text_reasoning(id="r1", text="thinking")
text = Content.from_text("answer")
r_events = _emit_content(reasoning, flow)
t_events = _emit_content(text, flow)
# reasoning events: Start + MsgStart + Content
assert isinstance(r_events[0], ReasoningStartEvent)
# text events should start with reasoning End events
assert isinstance(t_events[0], ReasoningMessageEndEvent)
assert isinstance(t_events[1], ReasoningEndEvent)
# then text start
assert isinstance(t_events[2], TextMessageStartEvent)
assert isinstance(t_events[3], TextMessageContentEvent)
def test_reasoning_distinct_ids_close_previous_block(self):
"""Emitting reasoning with a new message_id auto-closes the previous block."""
flow = FlowState()
c1 = Content.from_text_reasoning(id="block1", text="first")
c2 = Content.from_text_reasoning(id="block2", text="second")
events1 = _emit_text_reasoning(c1, flow)
events2 = _emit_text_reasoning(c2, flow)
close = _close_reasoning_block(flow)
# events1: Start(block1) + MsgStart(block1) + Content(block1)
assert events1[0].message_id == "block1"
# events2: MsgEnd(block1) + End(block1) + Start(block2) + MsgStart(block2) + Content(block2)
assert isinstance(events2[0], ReasoningMessageEndEvent)
assert events2[0].message_id == "block1"
assert isinstance(events2[1], ReasoningEndEvent)
assert events2[1].message_id == "block1"
assert isinstance(events2[2], ReasoningStartEvent)
assert events2[2].message_id == "block2"
# close: MsgEnd(block2) + End(block2)
assert isinstance(close[0], ReasoningMessageEndEvent)
assert close[0].message_id == "block2"
@@ -315,10 +315,7 @@ class CosmosCheckpointStorage:
"""
await self._ensure_container_proxy()
query = (
"SELECT * FROM c WHERE c.workflow_name = @workflow_name "
"ORDER BY c.timestamp DESC OFFSET 0 LIMIT 1"
)
query = "SELECT * FROM c WHERE c.workflow_name = @workflow_name ORDER BY c.timestamp DESC OFFSET 0 LIMIT 1"
parameters: list[dict[str, object]] = [
{"name": "@workflow_name", "value": workflow_name},
]
@@ -351,10 +348,7 @@ class CosmosCheckpointStorage:
"""
await self._ensure_container_proxy()
query = (
"SELECT c.checkpoint_id FROM c WHERE c.workflow_name = @workflow_name "
"ORDER BY c.timestamp ASC"
)
query = "SELECT c.checkpoint_id FROM c WHERE c.workflow_name = @workflow_name ORDER BY c.timestamp ASC"
parameters: list[dict[str, object]] = [
{"name": "@workflow_name", "value": workflow_name},
]
@@ -402,9 +402,7 @@ async def test_list_checkpoint_ids_empty_returns_empty(mock_container: MagicMock
# --- Tests for close and context manager ---
async def test_close_closes_owned_client(
monkeypatch: pytest.MonkeyPatch, mock_cosmos_client: MagicMock
) -> None:
async def test_close_closes_owned_client(monkeypatch: pytest.MonkeyPatch, mock_cosmos_client: MagicMock) -> None:
mock_factory = MagicMock(return_value=mock_cosmos_client)
monkeypatch.setattr(checkpoint_storage_module, "CosmosClient", mock_factory)
@@ -512,6 +512,7 @@ class RawOpenAIChatClient( # type: ignore[misc]
if stream:
function_call_ids: dict[int, tuple[str, str]] = {}
seen_reasoning_delta_item_ids: set[str] = set()
validated_options: dict[str, Any] | None = None
async def _stream() -> AsyncIterable[ChatResponseUpdate]:
@@ -530,6 +531,7 @@ class RawOpenAIChatClient( # type: ignore[misc]
chunk,
options=validated_options,
function_call_ids=function_call_ids,
seen_reasoning_delta_item_ids=seen_reasoning_delta_item_ids,
)
except Exception as ex:
self._handle_request_error(ex)
@@ -1930,6 +1932,7 @@ class RawOpenAIChatClient( # type: ignore[misc]
event: OpenAIResponseStreamEvent,
options: dict[str, Any],
function_call_ids: dict[int, tuple[str, str]],
seen_reasoning_delta_item_ids: set[str] | None = None,
) -> ChatResponseUpdate:
"""Parse an OpenAI Responses API streaming event into a ChatResponseUpdate."""
metadata: dict[str, Any] = {}
@@ -2008,6 +2011,8 @@ class RawOpenAIChatClient( # type: ignore[misc]
contents.append(Content.from_text(text=event.delta, raw_representation=event))
metadata.update(self._get_metadata_from_response(event))
case "response.reasoning_text.delta":
if seen_reasoning_delta_item_ids is not None:
seen_reasoning_delta_item_ids.add(event.item_id)
contents.append(
Content.from_text_reasoning(
id=event.item_id,
@@ -2017,15 +2022,21 @@ class RawOpenAIChatClient( # type: ignore[misc]
)
metadata.update(self._get_metadata_from_response(event))
case "response.reasoning_text.done":
contents.append(
Content.from_text_reasoning(
id=event.item_id,
text=event.text,
raw_representation=event,
# Done event carries the full accumulated text. Emit it only as a
# fallback when no delta was already received for this item_id, to
# avoid duplicating content in downstream accumulators (e.g. ag-ui).
if seen_reasoning_delta_item_ids is None or event.item_id not in seen_reasoning_delta_item_ids:
contents.append(
Content.from_text_reasoning(
id=event.item_id,
text=event.text,
raw_representation=event,
)
)
)
metadata.update(self._get_metadata_from_response(event))
case "response.reasoning_summary_text.delta":
if seen_reasoning_delta_item_ids is not None:
seen_reasoning_delta_item_ids.add(event.item_id)
contents.append(
Content.from_text_reasoning(
id=event.item_id,
@@ -2035,13 +2046,17 @@ class RawOpenAIChatClient( # type: ignore[misc]
)
metadata.update(self._get_metadata_from_response(event))
case "response.reasoning_summary_text.done":
contents.append(
Content.from_text_reasoning(
id=event.item_id,
text=event.text,
raw_representation=event,
# Done event carries the full accumulated text. Emit it only as a
# fallback when no delta was already received for this item_id, to
# avoid duplicating content in downstream accumulators (e.g. ag-ui).
if seen_reasoning_delta_item_ids is None or event.item_id not in seen_reasoning_delta_item_ids:
contents.append(
Content.from_text_reasoning(
id=event.item_id,
text=event.text,
raw_representation=event,
)
)
)
metadata.update(self._get_metadata_from_response(event))
case "response.code_interpreter_call_code.delta":
call_id = getattr(event, "call_id", None) or getattr(event, "id", None) or event.item_id
@@ -2065,6 +2080,9 @@ class RawOpenAIChatClient( # type: ignore[misc]
)
)
metadata.update(self._get_metadata_from_response(event))
# NOTE: Unlike reasoning done events, code_interpreter done events always
# emit content because downstream consumers do not accumulate
# code_interpreter deltas the same way.
case "response.code_interpreter_call_code.done":
call_id = getattr(event, "call_id", None) or getattr(event, "id", None) or event.item_id
ci_additional_properties = {
@@ -2808,11 +2808,12 @@ def test_streaming_reasoning_text_delta_event() -> None:
mock_metadata.assert_called_once_with(event)
def test_streaming_reasoning_text_done_event() -> None:
"""Test reasoning text done event creates TextReasoningContent with complete text."""
def test_streaming_reasoning_text_done_event_skipped_after_deltas() -> None:
"""Test reasoning text done event does not emit content when deltas were already received."""
client = OpenAIChatClient(model="test-model", api_key="test-key")
chat_options = ChatOptions()
function_call_ids: dict[int, tuple[str, str]] = {}
seen_reasoning_delta_item_ids: set[str] = {"reasoning_456"}
event = ResponseReasoningTextDoneEvent(
type="response.reasoning_text.done",
@@ -2824,12 +2825,40 @@ def test_streaming_reasoning_text_done_event() -> None:
)
with patch.object(client, "_get_metadata_from_response", return_value={"test": "data"}) as mock_metadata:
response = client._parse_chunk_from_openai(event, chat_options, function_call_ids) # type: ignore
response = client._parse_chunk_from_openai(
event, chat_options, function_call_ids, seen_reasoning_delta_item_ids
) # type: ignore
assert len(response.contents) == 0
mock_metadata.assert_called_once_with(event)
assert response.additional_properties == {"test": "data"}
def test_streaming_reasoning_text_done_event_fallback_without_deltas() -> None:
"""Test reasoning text done event emits content when no deltas were received for this item_id."""
client = OpenAIChatClient(model="test-model", api_key="test-key")
chat_options = ChatOptions()
function_call_ids: dict[int, tuple[str, str]] = {}
seen_reasoning_delta_item_ids: set[str] = set()
event = ResponseReasoningTextDoneEvent(
type="response.reasoning_text.done",
content_index=0,
item_id="reasoning_456",
output_index=0,
sequence_number=2,
text="complete reasoning",
)
with patch.object(client, "_get_metadata_from_response", return_value={"test": "data"}) as mock_metadata:
response = client._parse_chunk_from_openai(
event, chat_options, function_call_ids, seen_reasoning_delta_item_ids
) # type: ignore
assert len(response.contents) == 1
assert response.contents[0].type == "text_reasoning"
assert response.contents[0].id == "reasoning_456"
assert response.contents[0].text == "complete reasoning"
assert response.contents[0].raw_representation == event
mock_metadata.assert_called_once_with(event)
assert response.additional_properties == {"test": "data"}
@@ -2859,11 +2888,12 @@ def test_streaming_reasoning_summary_text_delta_event() -> None:
mock_metadata.assert_called_once_with(event)
def test_streaming_reasoning_summary_text_done_event() -> None:
"""Test reasoning summary text done event creates TextReasoningContent with complete text."""
def test_streaming_reasoning_summary_text_done_event_skipped_after_deltas() -> None:
"""Test reasoning summary text done event does not emit content when deltas were already received."""
client = OpenAIChatClient(model="test-model", api_key="test-key")
chat_options = ChatOptions()
function_call_ids: dict[int, tuple[str, str]] = {}
seen_reasoning_delta_item_ids: set[str] = {"summary_012"}
event = ResponseReasoningSummaryTextDoneEvent(
type="response.reasoning_summary_text.done",
@@ -2875,16 +2905,94 @@ def test_streaming_reasoning_summary_text_done_event() -> None:
)
with patch.object(client, "_get_metadata_from_response", return_value={"custom": "meta"}) as mock_metadata:
response = client._parse_chunk_from_openai(event, chat_options, function_call_ids) # type: ignore
response = client._parse_chunk_from_openai(
event, chat_options, function_call_ids, seen_reasoning_delta_item_ids
) # type: ignore
assert len(response.contents) == 0
mock_metadata.assert_called_once_with(event)
assert response.additional_properties == {"custom": "meta"}
def test_streaming_reasoning_summary_text_done_event_fallback_without_deltas() -> None:
"""Test reasoning summary text done event emits content when no deltas were received for this item_id."""
client = OpenAIChatClient(model="test-model", api_key="test-key")
chat_options = ChatOptions()
function_call_ids: dict[int, tuple[str, str]] = {}
seen_reasoning_delta_item_ids: set[str] = set()
event = ResponseReasoningSummaryTextDoneEvent(
type="response.reasoning_summary_text.done",
item_id="summary_012",
output_index=0,
sequence_number=4,
summary_index=0,
text="complete summary",
)
with patch.object(client, "_get_metadata_from_response", return_value={"custom": "meta"}) as mock_metadata:
response = client._parse_chunk_from_openai(
event, chat_options, function_call_ids, seen_reasoning_delta_item_ids
) # type: ignore
assert len(response.contents) == 1
assert response.contents[0].type == "text_reasoning"
assert response.contents[0].id == "summary_012"
assert response.contents[0].text == "complete summary"
assert response.contents[0].raw_representation == event
mock_metadata.assert_called_once_with(event)
assert response.additional_properties == {"custom": "meta"}
def test_streaming_reasoning_deltas_then_done_no_duplication() -> None:
"""Sending delta events followed by a done event produces content only from deltas."""
client = OpenAIChatClient(model="test-model", api_key="test-key")
chat_options = ChatOptions()
function_call_ids: dict[int, tuple[str, str]] = {}
seen_reasoning_delta_item_ids: set[str] = set()
item_id = "reasoning_seq"
delta1 = ResponseReasoningTextDeltaEvent(
type="response.reasoning_text.delta",
content_index=0,
item_id=item_id,
output_index=0,
sequence_number=1,
delta="Hello ",
)
delta2 = ResponseReasoningTextDeltaEvent(
type="response.reasoning_text.delta",
content_index=0,
item_id=item_id,
output_index=0,
sequence_number=2,
delta="world",
)
done = ResponseReasoningTextDoneEvent(
type="response.reasoning_text.done",
content_index=0,
item_id=item_id,
output_index=0,
sequence_number=3,
text="Hello world",
)
all_contents = []
with patch.object(client, "_get_metadata_from_response", return_value={}):
for event in [delta1, delta2, done]:
response = client._parse_chunk_from_openai(
event,
chat_options,
function_call_ids,
seen_reasoning_delta_item_ids, # type: ignore
)
all_contents.extend(response.contents)
assert len(all_contents) == 2
assert all_contents[0].text == "Hello "
assert all_contents[1].text == "world"
assert "".join(c.text for c in all_contents) == "Hello world"
def test_streaming_reasoning_events_preserve_metadata() -> None:
"""Test that reasoning events preserve metadata like regular text events."""
client = OpenAIChatClient(model="test-model", api_key="test-key")
@@ -82,9 +82,7 @@ async def main() -> None:
):
session = agent.create_session()
response1 = await agent.run(
"My name is Ada. I'm building a distributed database in Rust.", session=session
)
response1 = await agent.run("My name is Ada. I'm building a distributed database in Rust.", session=session)
print("User: My name is Ada. I'm building a distributed database in Rust.")
print(f"Assistant: {response1.text}\n")
@@ -82,9 +82,7 @@ async def main() -> None:
alice_session = agent.create_session(session_id="tenant-alice-session-1")
response = await agent.run(
"Hi! I'm planning a trip to Italy. I love Renaissance art.", session=alice_session
)
response = await agent.run("Hi! I'm planning a trip to Italy. I love Renaissance art.", session=alice_session)
print("Alice: I'm planning a trip to Italy. I love Renaissance art.")
print(f"Assistant: {response.text}\n")
@@ -97,9 +95,7 @@ async def main() -> None:
bob_session = agent.create_session(session_id="tenant-bob-session-1")
response = await agent.run(
"Hey! I'm learning to cook Thai food. I just made pad thai.", session=bob_session
)
response = await agent.run("Hey! I'm learning to cook Thai food. I just made pad thai.", session=bob_session)
print("Bob: I'm learning to cook Thai food. I just made pad thai.")
print(f"Assistant: {response.text}\n")
@@ -111,10 +111,7 @@ async def main() -> None:
cosmos_key = os.getenv("AZURE_COSMOS_KEY")
if not cosmos_endpoint or not cosmos_database_name or not cosmos_container_name:
print(
"Please set AZURE_COSMOS_ENDPOINT, AZURE_COSMOS_DATABASE_NAME, "
"and AZURE_COSMOS_CONTAINER_NAME."
)
print("Please set AZURE_COSMOS_ENDPOINT, AZURE_COSMOS_DATABASE_NAME, and AZURE_COSMOS_CONTAINER_NAME.")
return
# Authentication: supports both managed identity/RBAC and key-based auth.
@@ -131,12 +128,15 @@ async def main() -> None:
else:
from azure.identity.aio import DefaultAzureCredential
async with DefaultAzureCredential() as credential, CosmosCheckpointStorage(
endpoint=cosmos_endpoint,
credential=credential,
database_name=cosmos_database_name,
container_name=cosmos_container_name,
) as checkpoint_storage:
async with (
DefaultAzureCredential() as credential,
CosmosCheckpointStorage(
endpoint=cosmos_endpoint,
credential=credential,
database_name=cosmos_database_name,
container_name=cosmos_container_name,
) as checkpoint_storage,
):
await _run_workflow(checkpoint_storage)
@@ -57,10 +57,7 @@ async def main() -> None:
return
if not cosmos_endpoint or not cosmos_database_name or not cosmos_container_name:
print(
"Please set AZURE_COSMOS_ENDPOINT, AZURE_COSMOS_DATABASE_NAME, "
"and AZURE_COSMOS_CONTAINER_NAME."
)
print("Please set AZURE_COSMOS_ENDPOINT, AZURE_COSMOS_DATABASE_NAME, and AZURE_COSMOS_CONTAINER_NAME.")
return
# Use a single AzureCliCredential for both Cosmos and Foundry,