-
Notifications
You must be signed in to change notification settings - Fork 142
fix: resolve race conditions in EventConsumer and SSE transport #780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,5 +1,6 @@ | ||||||
| package io.a2a.server.events; | ||||||
|
|
||||||
| import java.util.concurrent.Executor; | ||||||
| import java.util.concurrent.Flow; | ||||||
|
|
||||||
| import io.a2a.spec.A2AError; | ||||||
|
|
@@ -19,22 +20,31 @@ | |||||
| public class EventConsumer { | ||||||
| private static final Logger LOGGER = LoggerFactory.getLogger(EventConsumer.class); | ||||||
| private final EventQueue queue; | ||||||
| private final Executor executor; | ||||||
| private volatile @Nullable Throwable error; | ||||||
| private volatile boolean cancelled = false; | ||||||
| private volatile boolean agentCompleted = false; | ||||||
| private volatile int pollTimeoutsAfterAgentCompleted = 0; | ||||||
| private volatile @Nullable TaskState lastSeenTaskState = null; | ||||||
| private volatile int pollTimeoutsWhileAwaitingFinal = 0; | ||||||
|
|
||||||
| private static final String ERROR_MSG = "Agent did not return any response"; | ||||||
| private static final int NO_WAIT = -1; | ||||||
| private static final int QUEUE_WAIT_MILLISECONDS = 500; | ||||||
| // In replicated scenarios, events can arrive hundreds of milliseconds after local agent completes | ||||||
| // Grace period allows Kafka replication to deliver late-arriving events | ||||||
| // 3 timeouts * 500ms = 1500ms grace period for replication delays | ||||||
| // Calculation: MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED * QUEUE_WAIT_MILLISECONDS = 1500ms | ||||||
| private static final int MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED = 3; | ||||||
| // Maximum time to wait for final event when awaitingFinalEvent is set | ||||||
| // If event doesn't arrive after this many timeouts, assume it won't arrive | ||||||
| // Calculation: MAX_POLL_TIMEOUTS_AWAITING_FINAL * QUEUE_WAIT_MILLISECONDS = 3000ms | ||||||
| private static final int MAX_AWAITING_FINAL_TIMEOUT_MS = 3000; | ||||||
| private static final int MAX_POLL_TIMEOUTS_AWAITING_FINAL = | ||||||
| MAX_AWAITING_FINAL_TIMEOUT_MS / QUEUE_WAIT_MILLISECONDS; | ||||||
|
|
||||||
| public EventConsumer(EventQueue queue) { | ||||||
| public EventConsumer(EventQueue queue, Executor executor) { | ||||||
| this.queue = queue; | ||||||
| this.executor = executor; | ||||||
| LOGGER.debug("EventConsumer created with queue {}", System.identityHashCode(queue)); | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -51,9 +61,12 @@ public Flow.Publisher<EventQueueItem> consumeAll() { | |||||
| .withBackpressureStrategy(BackpressureStrategy.BUFFER) | ||||||
| .withBufferSize(256); | ||||||
| return ZeroPublisher.create(conf, tube -> { | ||||||
| boolean completed = false; | ||||||
| try { | ||||||
| while (true) { | ||||||
| // CRITICAL: Spawn polling loop on executor to avoid blocking the calling thread | ||||||
| // The lambda returns immediately, but polling continues on separate thread | ||||||
| executor.execute(() -> { | ||||||
| boolean completed = false; | ||||||
| try { | ||||||
| while (true) { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The polling loop does not check the thread's interrupted status. Since this loop now runs on an executor thread, it should respect interruptions (e.g., during application shutdown). When
Suggested change
|
||||||
| // Check if cancelled by client disconnect | ||||||
| if (cancelled) { | ||||||
| LOGGER.debug("EventConsumer detected cancellation, exiting polling loop for queue {}", System.identityHashCode(queue)); | ||||||
|
|
@@ -82,8 +95,9 @@ public Flow.Publisher<EventQueueItem> consumeAll() { | |||||
| item = queue.dequeueEventItem(QUEUE_WAIT_MILLISECONDS); | ||||||
| if (item == null) { | ||||||
| int queueSize = queue.size(); | ||||||
| LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, timeoutCount={}", | ||||||
| agentCompleted, queueSize, pollTimeoutsAfterAgentCompleted); | ||||||
| boolean awaitingFinal = queue.isAwaitingFinalEvent(); | ||||||
| LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, awaitingFinalEvent={}, timeoutCount={}, awaitingTimeoutCount={}", | ||||||
| agentCompleted, queueSize, awaitingFinal, pollTimeoutsAfterAgentCompleted, pollTimeoutsWhileAwaitingFinal); | ||||||
| // If agent completed, a poll timeout means no more events are coming | ||||||
| // MainEventBusProcessor has 500ms to distribute events from MainEventBus | ||||||
| // If we timeout with agentCompleted=true, all events have been distributed | ||||||
|
|
@@ -94,8 +108,38 @@ public Flow.Publisher<EventQueueItem> consumeAll() { | |||||
| // CRITICAL: Do NOT close if task is in interrupted state (INPUT_REQUIRED, AUTH_REQUIRED) | ||||||
| // Per A2A spec, interrupted states are NOT terminal - the stream must stay open | ||||||
| // for future state updates even after agent completes (agent will be re-invoked later). | ||||||
| // | ||||||
| // CRITICAL: Don't start timeout counter if we're awaiting a final event. | ||||||
| // The awaitingFinalEvent flag is set when MainQueue enqueues a final event | ||||||
| // but it hasn't been distributed to this ChildQueue yet. | ||||||
| // HOWEVER: If we've been waiting too long for the final event (>3s), give up and | ||||||
| // proceed with normal timeout logic to prevent infinite waiting. | ||||||
| boolean isInterruptedState = lastSeenTaskState != null && lastSeenTaskState.isInterrupted(); | ||||||
| if (agentCompleted && queueSize == 0 && !isInterruptedState) { | ||||||
|
|
||||||
| // Track how long we've been waiting for the final event. | ||||||
| // Three cases for the awaiting counter: | ||||||
| // awaitingFinal && queueSize == 0: final event enqueued in MainQueue but not yet | ||||||
| // distributed here — increment timeout counter and give up after MAX timeout. | ||||||
| // awaitingFinal && queueSize > 0: events are still in transit, do nothing — | ||||||
| // the counter is reset below once an event is successfully dequeued. | ||||||
| // !awaitingFinal: not waiting for anything — reset the counter (timeout case; | ||||||
| // the successful-dequeue reset happens below at the event-received path). | ||||||
| if (awaitingFinal && queueSize == 0) { | ||||||
| pollTimeoutsWhileAwaitingFinal++; | ||||||
| if (pollTimeoutsWhileAwaitingFinal >= MAX_POLL_TIMEOUTS_AWAITING_FINAL) { | ||||||
| LOGGER.debug("Waited {} timeouts for final event but it hasn't arrived - proceeding with normal timeout logic (queue={})", | ||||||
| pollTimeoutsWhileAwaitingFinal, System.identityHashCode(queue)); | ||||||
| // Clear the flag on the queue itself, not just the local variable | ||||||
| queue.clearAwaitingFinalEvent(); | ||||||
| awaitingFinal = false; // Also update local variable for this iteration | ||||||
| } | ||||||
| } else if (!awaitingFinal) { | ||||||
| // Poll timed out and we are not awaiting a final event: reset the counter. | ||||||
| // (The successful-dequeue reset is handled separately below.) | ||||||
| pollTimeoutsWhileAwaitingFinal = 0; | ||||||
| } | ||||||
|
|
||||||
| if (agentCompleted && queueSize == 0 && !isInterruptedState && !awaitingFinal) { | ||||||
| pollTimeoutsAfterAgentCompleted++; | ||||||
| if (pollTimeoutsAfterAgentCompleted >= MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED) { | ||||||
| LOGGER.debug("Agent completed with {} consecutive poll timeouts and empty queue, closing for graceful completion (queue={})", | ||||||
|
|
@@ -116,11 +160,16 @@ public Flow.Publisher<EventQueueItem> consumeAll() { | |||||
| LOGGER.debug("Agent completed but queue has {} pending events, resetting timeout counter and continuing to poll (queue={})", | ||||||
| queueSize, System.identityHashCode(queue)); | ||||||
| pollTimeoutsAfterAgentCompleted = 0; // Reset counter when events arrive | ||||||
| } else if (agentCompleted && awaitingFinal) { | ||||||
| LOGGER.debug("Agent completed, awaiting final event (timeout {}/{}), continuing to poll (queue={})", | ||||||
| pollTimeoutsWhileAwaitingFinal, MAX_POLL_TIMEOUTS_AWAITING_FINAL, System.identityHashCode(queue)); | ||||||
| pollTimeoutsAfterAgentCompleted = 0; // Reset counter while awaiting final | ||||||
| } | ||||||
| continue; | ||||||
| } | ||||||
| // Event received - reset timeout counter | ||||||
| // Event received - reset timeout counters | ||||||
| pollTimeoutsAfterAgentCompleted = 0; | ||||||
| pollTimeoutsWhileAwaitingFinal = 0; | ||||||
| event = item.getEvent(); | ||||||
| LOGGER.debug("EventConsumer received event: {} (queue={})", | ||||||
| event.getClass().getSimpleName(), System.identityHashCode(queue)); | ||||||
|
|
@@ -179,10 +228,11 @@ public Flow.Publisher<EventQueueItem> consumeAll() { | |||||
| // the stream-end signal can reach the client BEFORE the buffered final event, | ||||||
| // causing the client to close the connection and never receive the final event. | ||||||
| // This is especially important in replicated scenarios where events arrive via Kafka | ||||||
| // and timing is less deterministic. A small delay ensures the buffer flushes. | ||||||
| // and timing is less deterministic. A delay ensures the buffer flushes. | ||||||
| // Increased to 150ms to account for CI environment latency and JVM scheduling delays. | ||||||
| if (isFinalSent) { | ||||||
| try { | ||||||
| Thread.sleep(50); // 50ms to allow SSE buffer flush | ||||||
| Thread.sleep(150); // 150ms to allow SSE buffer flush in CI environments | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using Additionally, If the underlying stream library ( References
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The use of |
||||||
| } catch (InterruptedException e) { | ||||||
| Thread.currentThread().interrupt(); | ||||||
| } | ||||||
|
|
@@ -198,15 +248,17 @@ public Flow.Publisher<EventQueueItem> consumeAll() { | |||||
| return; | ||||||
| } | ||||||
|
Comment on lines
248
to
249
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this error path, the completed = true;
tube.fail(t);
return; |
||||||
| } | ||||||
| } finally { | ||||||
| if (!completed) { | ||||||
| LOGGER.debug("EventConsumer finally block: calling tube.complete() for queue {}", System.identityHashCode(queue)); | ||||||
| tube.complete(); | ||||||
| LOGGER.debug("EventConsumer finally block: tube.complete() returned for queue {}", System.identityHashCode(queue)); | ||||||
| } else { | ||||||
| LOGGER.debug("EventConsumer finally block: completed=true, skipping tube.complete() for queue {}", System.identityHashCode(queue)); | ||||||
| } finally { | ||||||
| if (!completed) { | ||||||
| LOGGER.debug("EventConsumer finally block: calling tube.complete() for queue {}", System.identityHashCode(queue)); | ||||||
| tube.complete(); | ||||||
| LOGGER.debug("EventConsumer finally block: tube.complete() returned for queue {}", System.identityHashCode(queue)); | ||||||
| } else { | ||||||
| LOGGER.debug("EventConsumer finally block: completed=true, skipping tube.complete() for queue {}", System.identityHashCode(queue)); | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
| }); | ||||||
| // Lambda returns immediately - polling continues on executor thread | ||||||
| }); | ||||||
| } | ||||||
|
|
||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The calculation for
MAX_POLL_TIMEOUTS_AWAITING_FINALuses integer division. This could result in a shorter-than-intended timeout ifMAX_AWAITING_FINAL_TIMEOUT_MSis not perfectly divisible byQUEUE_WAIT_MILLISECONDS. For instance, with a 3100ms timeout and 500ms wait time, this would result in 6 timeouts (3000ms total) instead of 7 (3500ms) to meet the minimum duration. To ensure the timeout is always at least the specified duration, it's better to round up.References