Skip to content
14 changes: 13 additions & 1 deletion python/packages/core/agent_framework/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2816,6 +2816,7 @@ def __init__(
cleanup_hooks if cleanup_hooks is not None else []
)
self._cleanup_run: bool = False
self._stream_error: Exception | None = None
self._inner_stream: ResponseStream[Any, Any] | None = None
self._inner_stream_source: ResponseStream[Any, Any] | Awaitable[ResponseStream[Any, Any]] | None = None
self._wrap_inner: bool = False
Expand Down Expand Up @@ -2948,7 +2949,8 @@ async def __anext__(self) -> UpdateT:
await self._run_cleanup_hooks()
await self.get_final_response()
raise
except Exception:
except Exception as exc:
self._stream_error = exc
await self._run_cleanup_hooks()
Comment thread
droideronline marked this conversation as resolved.
Outdated
raise
if self._map_update is not None:
Expand Down Expand Up @@ -3112,6 +3114,16 @@ async def _run_cleanup_hooks(self) -> None:
if isawaitable(result):
await result

@property
def consumed(self) -> bool:
"""True if the stream completed normally (StopAsyncIteration was reached)."""
return self._consumed

@property
def stream_error(self) -> Exception | None:
Comment thread
droideronline marked this conversation as resolved.
Outdated
"""The exception that caused the stream to fail, or None if it completed normally."""
return self._stream_error

@property
def updates(self) -> Sequence[UpdateT]:
return self._updates
Expand Down
16 changes: 16 additions & 0 deletions python/packages/core/agent_framework/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,14 @@ async def _finalize_stream() -> None:
from ._types import ChatResponse

try:
if not result_stream.consumed:
# Stream did not complete normally (e.g., it errored). Skip
# get_final_response() to avoid firing result hooks such as
# after_run context providers on error paths. Capture the
# stream error on the span so it is not silently swallowed.
if result_stream.stream_error is not None:
capture_exception(span=span, exception=result_stream.stream_error, timestamp=time_ns())
return
Comment thread
droideronline marked this conversation as resolved.
Outdated
response: ChatResponse[Any] = await result_stream.get_final_response()
duration = duration_state.get("duration")
response_attributes = _get_response_attributes(attributes, response)
Expand Down Expand Up @@ -1579,6 +1587,14 @@ async def _finalize_stream() -> None:
from ._types import AgentResponse

try:
if not result_stream.consumed:
# Stream did not complete normally (e.g., it errored). Skip
# get_final_response() to avoid firing result hooks such as
# after_run context providers on error paths. Capture the
# stream error on the span so it is not silently swallowed.
if result_stream.stream_error is not None:
capture_exception(span=span, exception=result_stream.stream_error, timestamp=time_ns())
return
response: AgentResponse[Any] = await result_stream.get_final_response()
duration = duration_state.get("duration")
response_attributes = _get_response_attributes(
Expand Down
Loading