From 64c68ca8576ccfde295d0cf1db51bd5c2b9bed82 Mon Sep 17 00:00:00 2001 From: Dineshsuriya D <43177361+droideronline@users.noreply.github.com> Date: Tue, 14 Apr 2026 15:00:31 +0530 Subject: [PATCH] Python: Skip get_final_response in OTel _finalize_stream when stream errored (#5232) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Python: Skip get_final_response in OTel _finalize_stream when stream errored When a streaming error occurs, _finalize_stream (a cleanup hook registered by AgentTelemetryLayer) was unconditionally calling get_final_response(), which triggers all registered result hooks including after_run context providers. This caused providers to fire incorrectly on error paths. Guard against this by checking result_stream._consumed: True only after StopAsyncIteration (normal completion), False when an exception was raised. The fix applies to both the chat client and agent telemetry layers. Closes #5231 * Python: Expose consumed/stream_error on ResponseStream and capture error in OTel span Address Copilot review feedback on #5232: - Add `_stream_error: Exception | None` to ResponseStream, set in __anext__'s except branch so cleanup hooks can inspect the failure. - Expose public `consumed` and `stream_error` properties to avoid coupling observability.py to private stream internals. - Update both _finalize_stream closures (chat and agent layers) to use the public properties and call capture_exception() with the stream error before returning early, ensuring the OTel span records the failure rather than closing silently. * Python: Address Copilot review feedback on stream error handling - Use stream_error is not None as the guard in _finalize_stream instead of not consumed, so the early-return path is keyed precisely to actual errors rather than any non-normal completion state. - Clear _stream_error after _run_cleanup_hooks() completes to avoid retaining the exception traceback (and any large object graphs it references) on the stream instance beyond the cleanup phase. * Python: Remove consumed/stream_error properties, use private attrs directly Per review feedback: since observability.py and _types.py are in the same package, accessing _stream_error directly is fine and the public properties are unnecessary. * Python: Fix Pyright reportPrivateUsage via inline ignore comments Keep _stream_error private (consistent with rest of ResponseStream), and suppress reportPrivateUsage at the call sites in observability.py with inline pyright: ignore comments — access is intentional within the package. --- python/packages/core/agent_framework/_types.py | 9 +++++++-- .../packages/core/agent_framework/observability.py | 12 ++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/python/packages/core/agent_framework/_types.py b/python/packages/core/agent_framework/_types.py index 87799d0848..584c1f0110 100644 --- a/python/packages/core/agent_framework/_types.py +++ b/python/packages/core/agent_framework/_types.py @@ -2816,6 +2816,7 @@ class ResponseStream(AsyncIterable[UpdateT], Generic[UpdateT, FinalT]): cleanup_hooks if cleanup_hooks is not None else [] ) self._cleanup_run: bool = False + self._stream_error: Exception | None = None self._inner_stream: ResponseStream[Any, Any] | None = None self._inner_stream_source: ResponseStream[Any, Any] | Awaitable[ResponseStream[Any, Any]] | None = None self._wrap_inner: bool = False @@ -2948,8 +2949,12 @@ class ResponseStream(AsyncIterable[UpdateT], Generic[UpdateT, FinalT]): await self._run_cleanup_hooks() await self.get_final_response() raise - except Exception: - await self._run_cleanup_hooks() + except Exception as exc: + self._stream_error = exc + try: + await self._run_cleanup_hooks() + finally: + self._stream_error = None raise if self._map_update is not None: update = self._map_update(update) # type: ignore[assignment] diff --git a/python/packages/core/agent_framework/observability.py b/python/packages/core/agent_framework/observability.py index 8d2eb05136..6998e5994f 100644 --- a/python/packages/core/agent_framework/observability.py +++ b/python/packages/core/agent_framework/observability.py @@ -1323,6 +1323,12 @@ class ChatTelemetryLayer(Generic[OptionsCoT]): from ._types import ChatResponse try: + if result_stream._stream_error is not None: # pyright: ignore[reportPrivateUsage] + # Stream errored; skip get_final_response() to avoid firing + # result hooks such as after_run context providers on error + # paths. Capture the error on the span before returning. + capture_exception(span=span, exception=result_stream._stream_error, timestamp=time_ns()) # pyright: ignore[reportPrivateUsage] + return response: ChatResponse[Any] = await result_stream.get_final_response() duration = duration_state.get("duration") response_attributes = _get_response_attributes(attributes, response) @@ -1579,6 +1585,12 @@ class AgentTelemetryLayer: from ._types import AgentResponse try: + if result_stream._stream_error is not None: # pyright: ignore[reportPrivateUsage] + # Stream errored; skip get_final_response() to avoid firing + # result hooks such as after_run context providers on error + # paths. Capture the error on the span before returning. + capture_exception(span=span, exception=result_stream._stream_error, timestamp=time_ns()) # pyright: ignore[reportPrivateUsage] + return response: AgentResponse[Any] = await result_stream.get_final_response() duration = duration_state.get("duration") response_attributes = _get_response_attributes(