mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
Python: Skip get_final_response in OTel _finalize_stream when stream errored (#5232)
* Python: Skip get_final_response in OTel _finalize_stream when stream errored When a streaming error occurs, _finalize_stream (a cleanup hook registered by AgentTelemetryLayer) was unconditionally calling get_final_response(), which triggers all registered result hooks including after_run context providers. This caused providers to fire incorrectly on error paths. Guard against this by checking result_stream._consumed: True only after StopAsyncIteration (normal completion), False when an exception was raised. The fix applies to both the chat client and agent telemetry layers. Closes #5231 * Python: Expose consumed/stream_error on ResponseStream and capture error in OTel span Address Copilot review feedback on #5232: - Add `_stream_error: Exception | None` to ResponseStream, set in __anext__'s except branch so cleanup hooks can inspect the failure. - Expose public `consumed` and `stream_error` properties to avoid coupling observability.py to private stream internals. - Update both _finalize_stream closures (chat and agent layers) to use the public properties and call capture_exception() with the stream error before returning early, ensuring the OTel span records the failure rather than closing silently. * Python: Address Copilot review feedback on stream error handling - Use stream_error is not None as the guard in _finalize_stream instead of not consumed, so the early-return path is keyed precisely to actual errors rather than any non-normal completion state. - Clear _stream_error after _run_cleanup_hooks() completes to avoid retaining the exception traceback (and any large object graphs it references) on the stream instance beyond the cleanup phase. * Python: Remove consumed/stream_error properties, use private attrs directly Per review feedback: since observability.py and _types.py are in the same package, accessing _stream_error directly is fine and the public properties are unnecessary. * Python: Fix Pyright reportPrivateUsage via inline ignore comments Keep _stream_error private (consistent with rest of ResponseStream), and suppress reportPrivateUsage at the call sites in observability.py with inline pyright: ignore comments — access is intentional within the package.
This commit is contained in:
committed by
GitHub
Unverified
parent
98e17764a4
commit
64c68ca857
@@ -2816,6 +2816,7 @@ class ResponseStream(AsyncIterable[UpdateT], Generic[UpdateT, FinalT]):
|
||||
cleanup_hooks if cleanup_hooks is not None else []
|
||||
)
|
||||
self._cleanup_run: bool = False
|
||||
self._stream_error: Exception | None = None
|
||||
self._inner_stream: ResponseStream[Any, Any] | None = None
|
||||
self._inner_stream_source: ResponseStream[Any, Any] | Awaitable[ResponseStream[Any, Any]] | None = None
|
||||
self._wrap_inner: bool = False
|
||||
@@ -2948,8 +2949,12 @@ class ResponseStream(AsyncIterable[UpdateT], Generic[UpdateT, FinalT]):
|
||||
await self._run_cleanup_hooks()
|
||||
await self.get_final_response()
|
||||
raise
|
||||
except Exception:
|
||||
await self._run_cleanup_hooks()
|
||||
except Exception as exc:
|
||||
self._stream_error = exc
|
||||
try:
|
||||
await self._run_cleanup_hooks()
|
||||
finally:
|
||||
self._stream_error = None
|
||||
raise
|
||||
if self._map_update is not None:
|
||||
update = self._map_update(update) # type: ignore[assignment]
|
||||
|
||||
@@ -1323,6 +1323,12 @@ class ChatTelemetryLayer(Generic[OptionsCoT]):
|
||||
from ._types import ChatResponse
|
||||
|
||||
try:
|
||||
if result_stream._stream_error is not None: # pyright: ignore[reportPrivateUsage]
|
||||
# Stream errored; skip get_final_response() to avoid firing
|
||||
# result hooks such as after_run context providers on error
|
||||
# paths. Capture the error on the span before returning.
|
||||
capture_exception(span=span, exception=result_stream._stream_error, timestamp=time_ns()) # pyright: ignore[reportPrivateUsage]
|
||||
return
|
||||
response: ChatResponse[Any] = await result_stream.get_final_response()
|
||||
duration = duration_state.get("duration")
|
||||
response_attributes = _get_response_attributes(attributes, response)
|
||||
@@ -1579,6 +1585,12 @@ class AgentTelemetryLayer:
|
||||
from ._types import AgentResponse
|
||||
|
||||
try:
|
||||
if result_stream._stream_error is not None: # pyright: ignore[reportPrivateUsage]
|
||||
# Stream errored; skip get_final_response() to avoid firing
|
||||
# result hooks such as after_run context providers on error
|
||||
# paths. Capture the error on the span before returning.
|
||||
capture_exception(span=span, exception=result_stream._stream_error, timestamp=time_ns()) # pyright: ignore[reportPrivateUsage]
|
||||
return
|
||||
response: AgentResponse[Any] = await result_stream.get_final_response()
|
||||
duration = duration_state.get("duration")
|
||||
response_attributes = _get_response_attributes(
|
||||
|
||||
Reference in New Issue
Block a user