fix: resolve race conditions in EventConsumer and SSE transport#780
fix: resolve race conditions in EventConsumer and SSE transport#780kabir wants to merge 1 commit intoa2aproject:mainfrom
Conversation
…ansport Fixed multiple race conditions causing intermittent test failures: 1. SSE Subscription Race: Removed executor.execute() wrapper in REST/JSONRPC routes that delayed subscription by 100-600ms, causing events to be lost when EventConsumer started emitting before subscriber was ready. 2. Cancel Race: Changed onCancelTask to use consumeAndBreakOnInterrupt() instead of consumeAll(). Removed unused ResultAggregator.consumeAll() method since cancel was its only caller. 3. EventConsumer Threading: Moved EventConsumer polling loop to executor thread to prevent blocking caller, ensuring subscription happens immediately without delay. 4. EventQueue Improvements: Enhanced awaitingFinalEvent tracking with timeout guards (max 3s wait) to prevent infinite waiting if final event never arrives due to distribution delays in replicated scenarios. 5. SSE Buffer Flush: Increased sleep delay from 50ms to 150ms to account for CI environment latency and ensure buffered events flush before stream ends. 6. Counter Logic Cleanup: Improved pollTimeoutsWhileAwaitingFinal reset logic to only reset when not awaiting final event. Calculated timeout constant from base timeout value for better maintainability. These fixes address intermittent failures in testNonBlockingWithMultipleMessages and testCancelTaskSuccess. Validated with 900+ test iterations in CI (9 JDK/transport combinations × 100 iterations) with only 1 failure, down from ~10% failure rate. Co-Authored-By: Emmanuel Hugonnet <ehugonne@redhat.com> Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request enhances SSE streaming reliability by implementing synchronous subscriptions, preventing buffering via headers and queue limits, and offloading polling to a dedicated executor. It also introduces a timeout mechanism for final events. Feedback recommends restoring error handling for user-provided executor calls, replacing magic numbers with constants for flush delays, and refining timeout calculations to avoid rounding errors.
| throw new io.a2a.spec.InternalError("Agent cancellation failed: " + e.getMessage()); | ||
| } | ||
|
|
||
| agentExecutor.cancel(cancelRequestContext, emitter); |
There was a problem hiding this comment.
The try-catch block that previously wrapped the call to agentExecutor.cancel() has been removed. Since AgentExecutor is a user-provided implementation, it could throw an unexpected exception. Without this protection, any exception from cancel() will be unhandled and could crash the request handler thread.
To ensure robustness, the call should be wrapped in a try-catch block. Exceptions should be caught and enqueued as error events via the emitter, which aligns with the event-driven error handling approach used elsewhere in this pull request. Ensure that any application-level errors used here, such as InternalError, extend Exception or RuntimeException rather than java.lang.Error.
| agentExecutor.cancel(cancelRequestContext, emitter); | |
| try { | |
| agentExecutor.cancel(cancelRequestContext, emitter); | |
| } catch (TaskNotCancelableError e) { | |
| // Expected error - log and enqueue | |
| LOGGER.info("Task {} is not cancelable, agent threw: {}", task.id(), e.getMessage()); | |
| emitter.fail(e); | |
| } catch (A2AError e) { | |
| // Other A2A errors - log and enqueue | |
| LOGGER.warn("Agent cancellation threw A2AError for task {}: {} - {}", | |
| task.id(), e.getClass().getSimpleName(), e.getMessage(), e); | |
| emitter.fail(e); | |
| } catch (Exception e) { | |
| // Unexpected errors - log and enqueue as InternalError | |
| LOGGER.error("Agent cancellation threw unexpected exception for task {}", task.id(), e); | |
| emitter.fail(new io.a2a.spec.InternalError("Agent cancellation failed: " + e.getMessage(), e)); | |
| } |
References
- Application-level errors should extend Exception or RuntimeException, not java.lang.Error.
| if (isFinalSent) { | ||
| try { | ||
| Thread.sleep(50); // 50ms to allow SSE buffer flush | ||
| Thread.sleep(150); // 150ms to allow SSE buffer flush in CI environments |
There was a problem hiding this comment.
Using Thread.sleep() to mitigate a race condition is brittle. While increasing the delay to 150ms makes failure less likely, it doesn't eliminate the race condition where the stream completion signal might arrive before the final event is flushed. This can still cause intermittent failures.
Additionally, 150 is a magic number. It should be defined as a named constant (e.g., SSE_TUBE_FLUSH_DELAY_MS). Since this constant may be relevant to other components using SSE, consider placing it in a shared constants class or module to ensure consistency across the repository.
If the underlying stream library (mutiny.zero.Tube) provides a more deterministic way to ensure all sent items are flushed before completing the stream, that should be preferred over a fixed sleep.
References
- Constants that are used across multiple modules or components should be moved to a shared location to avoid duplication and ensure consistency.
- Stricter coding practices, such as avoiding magic numbers, are required for production code.
| private static final int MAX_POLL_TIMEOUTS_AWAITING_FINAL = | ||
| MAX_AWAITING_FINAL_TIMEOUT_MS / QUEUE_WAIT_MILLISECONDS; |
There was a problem hiding this comment.
The calculation for MAX_POLL_TIMEOUTS_AWAITING_FINAL uses integer division. This could result in a shorter-than-intended timeout if MAX_AWAITING_FINAL_TIMEOUT_MS is not perfectly divisible by QUEUE_WAIT_MILLISECONDS. For instance, with a 3100ms timeout and 500ms wait time, this would result in 6 timeouts (3000ms total) instead of 7 (3500ms) to meet the minimum duration. To ensure the timeout is always at least the specified duration, it's better to round up.
(MAX_AWAITING_FINAL_TIMEOUT_MS + QUEUE_WAIT_MILLISECONDS - 1) / QUEUE_WAIT_MILLISECONDS;References
- Stricter coding practices for timeouts and durations should be followed in production code.
There was a problem hiding this comment.
Code Review
This pull request addresses race conditions and buffering issues in Server-Sent Events (SSE) streaming. Key changes include moving the event polling loop to a dedicated executor within EventConsumer, subscribing synchronously in the server routes to ensure events aren't lost, and adding specific SSE headers to prevent buffering. Additionally, a timeout mechanism was added to wait for final events in distributed environments. Review feedback highlights a fragile 150ms sleep for buffer flushing that should be handled at the transport layer, a missing interruption check in the new polling loop, and a potential violation of reactive stream specs where both error and completion signals could be sent.
| if (isFinalSent) { | ||
| try { | ||
| Thread.sleep(50); // 50ms to allow SSE buffer flush | ||
| Thread.sleep(150); // 150ms to allow SSE buffer flush in CI environments |
There was a problem hiding this comment.
The use of Thread.sleep(150) to manage buffer flushing is a fragile workaround that introduces a mandatory 150ms latency to the end of every successful stream, including blocking calls. This significantly impacts performance for fast agents. Furthermore, EventConsumer is a common component and should not contain transport-specific logic for 'SSE buffer flush'. A more robust solution would be to ensure the transport layer (e.g., MultiSseSupport) handles the graceful shutdown after the last write completion callback, rather than relying on a hardcoded delay in the event producer.
| executor.execute(() -> { | ||
| boolean completed = false; | ||
| try { | ||
| while (true) { |
There was a problem hiding this comment.
The polling loop does not check the thread's interrupted status. Since this loop now runs on an executor thread, it should respect interruptions (e.g., during application shutdown). When queue.dequeueEventItem returns null due to an InterruptedException (which it catches and re-interrupts internally), the loop currently continues, potentially leading to a spin-lock or preventing graceful thread termination.
| while (true) { | |
| while (!Thread.currentThread().isInterrupted()) { |
| return; | ||
| } |
There was a problem hiding this comment.
In this error path, the completed flag is not set to true. This causes the finally block at line 251 to call tube.complete() after tube.fail(t) has already been invoked. Reactive stream specifications dictate that a stream should terminate with either an error or a completion signal, but not both. Setting completed = true here would prevent the redundant completion signal.
completed = true;
tube.fail(t);
return;
…ansport
Fixed multiple race conditions causing intermittent test failures:
SSE Subscription Race: Removed executor.execute() wrapper in REST/JSONRPC routes that delayed subscription by 100-600ms, causing events to be lost when EventConsumer started emitting before subscriber was ready.
Cancel Race: Changed onCancelTask to use consumeAndBreakOnInterrupt() instead of consumeAll(). Removed unused ResultAggregator.consumeAll() method since cancel was its only caller.
EventConsumer Threading: Moved EventConsumer polling loop to executor thread to prevent blocking caller, ensuring subscription happens immediately without delay.
EventQueue Improvements: Enhanced awaitingFinalEvent tracking with timeout guards (max 3s wait) to prevent infinite waiting if final event never arrives due to distribution delays in replicated scenarios.
SSE Buffer Flush: Increased sleep delay from 50ms to 150ms to account for CI environment latency and ensure buffered events flush before stream ends.
Counter Logic Cleanup: Improved pollTimeoutsWhileAwaitingFinal reset logic to only reset when not awaiting final event. Calculated timeout constant from base timeout value for better maintainability.
These fixes address intermittent failures in testNonBlockingWithMultipleMessages and testCancelTaskSuccess. Validated with 900+ test iterations in CI (9 JDK/transport combinations × 100 iterations) with only 1 failure, down from ~10% failure rate.