Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -299,15 +299,15 @@ public void invokeJSONRPCHandler(@Body String body, RoutingContext rc) {
.putHeader(CONTENT_TYPE, APPLICATION_JSON)
.end(serializeResponse(error));
} else if (streaming) {
final Multi<? extends A2AResponse<?>> finalStreamingResponse = streamingResponse;
executor.execute(() -> {
// Convert Multi<A2AResponse> to Multi<String> with SSE formatting
AtomicLong eventIdCounter = new AtomicLong(0);
Multi<String> sseEvents = finalStreamingResponse
.map(response -> SseFormatter.formatResponseAsSSE(response, eventIdCounter.getAndIncrement()));
// Write SSE-formatted strings to HTTP response
MultiSseSupport.writeSseStrings(sseEvents, rc, context);
});
// Convert Multi<A2AResponse> to Multi<String> with SSE formatting
// CRITICAL: Subscribe synchronously to avoid race condition where EventConsumer
// starts emitting events before MultiSseSupport subscribes. The executor.execute()
// wrapper caused 100-600ms delays before subscription, causing events to be lost.
AtomicLong eventIdCounter = new AtomicLong(0);
Multi<String> sseEvents = streamingResponse
.map(response -> SseFormatter.formatResponseAsSSE(response, eventIdCounter.getAndIncrement()));
// Write SSE-formatted strings to HTTP response
MultiSseSupport.writeSseStrings(sseEvents, rc, context);

} else {
rc.response()
Expand Down Expand Up @@ -783,7 +783,17 @@ public void onNext(String sseEvent) {
if (headers.get(CONTENT_TYPE) == null) {
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
}
// Additional SSE headers to prevent buffering
headers.set("Cache-Control", "no-cache");
headers.set("X-Accel-Buffering", "no"); // Disable nginx buffering
response.setChunked(true);

// CRITICAL: Disable write queue max size to prevent buffering
// Vert.x buffers writes by default - we need immediate flushing for SSE
response.setWriteQueueMaxSize(1);

// Send initial SSE comment to kickstart the stream
response.write(": SSE stream started\n\n");
}

// Write SSE-formatted string to response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,15 @@ public void sendMessageStreaming(@Body String body, RoutingContext rc) {
if (error != null) {
sendResponse(rc, error);
} else if (streamingResponse != null) {
final HTTPRestStreamingResponse finalStreamingResponse = streamingResponse;
executor.execute(() -> {
// Convert Flow.Publisher<String> (JSON) to Multi<String> (SSE-formatted)
AtomicLong eventIdCounter = new AtomicLong(0);
Multi<String> sseEvents = Multi.createFrom().publisher(finalStreamingResponse.getPublisher())
.map(json -> SseFormatter.formatJsonAsSSE(json, eventIdCounter.getAndIncrement()));
// Write SSE-formatted strings to HTTP response
MultiSseSupport.writeSseStrings(sseEvents, rc, context);
});
// Convert Flow.Publisher<String> (JSON) to Multi<String> (SSE-formatted)
// CRITICAL: Subscribe synchronously to avoid race condition where EventConsumer
// starts emitting events before MultiSseSupport subscribes. The executor.execute()
// wrapper caused 100-600ms delays before subscription, causing events to be lost.
AtomicLong eventIdCounter = new AtomicLong(0);
Multi<String> sseEvents = Multi.createFrom().publisher(streamingResponse.getPublisher())
.map(json -> SseFormatter.formatJsonAsSSE(json, eventIdCounter.getAndIncrement()));
// Write SSE-formatted strings to HTTP response
MultiSseSupport.writeSseStrings(sseEvents, rc, context);
}
}
}
Expand Down Expand Up @@ -431,15 +431,15 @@ public void subscribeToTask(RoutingContext rc) {
if (error != null) {
sendResponse(rc, error);
} else if (streamingResponse != null) {
final HTTPRestStreamingResponse finalStreamingResponse = streamingResponse;
executor.execute(() -> {
// Convert Flow.Publisher<String> (JSON) to Multi<String> (SSE-formatted)
AtomicLong eventIdCounter = new AtomicLong(0);
Multi<String> sseEvents = Multi.createFrom().publisher(finalStreamingResponse.getPublisher())
.map(json -> SseFormatter.formatJsonAsSSE(json, eventIdCounter.getAndIncrement()));
// Write SSE-formatted strings to HTTP response
MultiSseSupport.writeSseStrings(sseEvents, rc, context);
});
// Convert Flow.Publisher<String> (JSON) to Multi<String> (SSE-formatted)
// CRITICAL: Subscribe synchronously to avoid race condition where EventConsumer
// starts emitting events before MultiSseSupport subscribes. The executor.execute()
// wrapper caused 100-600ms delays before subscription, causing events to be lost.
AtomicLong eventIdCounter = new AtomicLong(0);
Multi<String> sseEvents = Multi.createFrom().publisher(streamingResponse.getPublisher())
.map(json -> SseFormatter.formatJsonAsSSE(json, eventIdCounter.getAndIncrement()));
// Write SSE-formatted strings to HTTP response
MultiSseSupport.writeSseStrings(sseEvents, rc, context);
}
}
}
Expand Down
90 changes: 71 additions & 19 deletions server-common/src/main/java/io/a2a/server/events/EventConsumer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.a2a.server.events;

import java.util.concurrent.Executor;
import java.util.concurrent.Flow;

import io.a2a.spec.A2AError;
Expand All @@ -19,22 +20,31 @@
public class EventConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(EventConsumer.class);
private final EventQueue queue;
private final Executor executor;
private volatile @Nullable Throwable error;
private volatile boolean cancelled = false;
private volatile boolean agentCompleted = false;
private volatile int pollTimeoutsAfterAgentCompleted = 0;
private volatile @Nullable TaskState lastSeenTaskState = null;
private volatile int pollTimeoutsWhileAwaitingFinal = 0;

private static final String ERROR_MSG = "Agent did not return any response";
private static final int NO_WAIT = -1;
private static final int QUEUE_WAIT_MILLISECONDS = 500;
// In replicated scenarios, events can arrive hundreds of milliseconds after local agent completes
// Grace period allows Kafka replication to deliver late-arriving events
// 3 timeouts * 500ms = 1500ms grace period for replication delays
// Calculation: MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED * QUEUE_WAIT_MILLISECONDS = 1500ms
private static final int MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED = 3;
// Maximum time to wait for final event when awaitingFinalEvent is set
// If event doesn't arrive after this many timeouts, assume it won't arrive
// Calculation: MAX_POLL_TIMEOUTS_AWAITING_FINAL * QUEUE_WAIT_MILLISECONDS = 3000ms
private static final int MAX_AWAITING_FINAL_TIMEOUT_MS = 3000;
private static final int MAX_POLL_TIMEOUTS_AWAITING_FINAL =
MAX_AWAITING_FINAL_TIMEOUT_MS / QUEUE_WAIT_MILLISECONDS;
Comment on lines +42 to +43
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The calculation for MAX_POLL_TIMEOUTS_AWAITING_FINAL uses integer division. This could result in a shorter-than-intended timeout if MAX_AWAITING_FINAL_TIMEOUT_MS is not perfectly divisible by QUEUE_WAIT_MILLISECONDS. For instance, with a 3100ms timeout and 500ms wait time, this would result in 6 timeouts (3000ms total) instead of 7 (3500ms) to meet the minimum duration. To ensure the timeout is always at least the specified duration, it's better to round up.

    (MAX_AWAITING_FINAL_TIMEOUT_MS + QUEUE_WAIT_MILLISECONDS - 1) / QUEUE_WAIT_MILLISECONDS;
References
  1. Stricter coding practices for timeouts and durations should be followed in production code.


public EventConsumer(EventQueue queue) {
public EventConsumer(EventQueue queue, Executor executor) {
this.queue = queue;
this.executor = executor;
LOGGER.debug("EventConsumer created with queue {}", System.identityHashCode(queue));
}

Expand All @@ -51,9 +61,12 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
.withBackpressureStrategy(BackpressureStrategy.BUFFER)
.withBufferSize(256);
return ZeroPublisher.create(conf, tube -> {
boolean completed = false;
try {
while (true) {
// CRITICAL: Spawn polling loop on executor to avoid blocking the calling thread
// The lambda returns immediately, but polling continues on separate thread
executor.execute(() -> {
boolean completed = false;
try {
while (true) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The polling loop does not check the thread's interrupted status. Since this loop now runs on an executor thread, it should respect interruptions (e.g., during application shutdown). When queue.dequeueEventItem returns null due to an InterruptedException (which it catches and re-interrupts internally), the loop currently continues, potentially leading to a spin-lock or preventing graceful thread termination.

Suggested change
while (true) {
while (!Thread.currentThread().isInterrupted()) {

// Check if cancelled by client disconnect
if (cancelled) {
LOGGER.debug("EventConsumer detected cancellation, exiting polling loop for queue {}", System.identityHashCode(queue));
Expand Down Expand Up @@ -82,8 +95,9 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
item = queue.dequeueEventItem(QUEUE_WAIT_MILLISECONDS);
if (item == null) {
int queueSize = queue.size();
LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, timeoutCount={}",
agentCompleted, queueSize, pollTimeoutsAfterAgentCompleted);
boolean awaitingFinal = queue.isAwaitingFinalEvent();
LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, awaitingFinalEvent={}, timeoutCount={}, awaitingTimeoutCount={}",
agentCompleted, queueSize, awaitingFinal, pollTimeoutsAfterAgentCompleted, pollTimeoutsWhileAwaitingFinal);
// If agent completed, a poll timeout means no more events are coming
// MainEventBusProcessor has 500ms to distribute events from MainEventBus
// If we timeout with agentCompleted=true, all events have been distributed
Expand All @@ -94,8 +108,38 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
// CRITICAL: Do NOT close if task is in interrupted state (INPUT_REQUIRED, AUTH_REQUIRED)
// Per A2A spec, interrupted states are NOT terminal - the stream must stay open
// for future state updates even after agent completes (agent will be re-invoked later).
//
// CRITICAL: Don't start timeout counter if we're awaiting a final event.
// The awaitingFinalEvent flag is set when MainQueue enqueues a final event
// but it hasn't been distributed to this ChildQueue yet.
// HOWEVER: If we've been waiting too long for the final event (>3s), give up and
// proceed with normal timeout logic to prevent infinite waiting.
boolean isInterruptedState = lastSeenTaskState != null && lastSeenTaskState.isInterrupted();
if (agentCompleted && queueSize == 0 && !isInterruptedState) {

// Track how long we've been waiting for the final event.
// Three cases for the awaiting counter:
// awaitingFinal && queueSize == 0: final event enqueued in MainQueue but not yet
// distributed here — increment timeout counter and give up after MAX timeout.
// awaitingFinal && queueSize > 0: events are still in transit, do nothing —
// the counter is reset below once an event is successfully dequeued.
// !awaitingFinal: not waiting for anything — reset the counter (timeout case;
// the successful-dequeue reset happens below at the event-received path).
if (awaitingFinal && queueSize == 0) {
pollTimeoutsWhileAwaitingFinal++;
if (pollTimeoutsWhileAwaitingFinal >= MAX_POLL_TIMEOUTS_AWAITING_FINAL) {
LOGGER.debug("Waited {} timeouts for final event but it hasn't arrived - proceeding with normal timeout logic (queue={})",
pollTimeoutsWhileAwaitingFinal, System.identityHashCode(queue));
// Clear the flag on the queue itself, not just the local variable
queue.clearAwaitingFinalEvent();
awaitingFinal = false; // Also update local variable for this iteration
}
} else if (!awaitingFinal) {
// Poll timed out and we are not awaiting a final event: reset the counter.
// (The successful-dequeue reset is handled separately below.)
pollTimeoutsWhileAwaitingFinal = 0;
}

if (agentCompleted && queueSize == 0 && !isInterruptedState && !awaitingFinal) {
pollTimeoutsAfterAgentCompleted++;
if (pollTimeoutsAfterAgentCompleted >= MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED) {
LOGGER.debug("Agent completed with {} consecutive poll timeouts and empty queue, closing for graceful completion (queue={})",
Expand All @@ -116,11 +160,16 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
LOGGER.debug("Agent completed but queue has {} pending events, resetting timeout counter and continuing to poll (queue={})",
queueSize, System.identityHashCode(queue));
pollTimeoutsAfterAgentCompleted = 0; // Reset counter when events arrive
} else if (agentCompleted && awaitingFinal) {
LOGGER.debug("Agent completed, awaiting final event (timeout {}/{}), continuing to poll (queue={})",
pollTimeoutsWhileAwaitingFinal, MAX_POLL_TIMEOUTS_AWAITING_FINAL, System.identityHashCode(queue));
pollTimeoutsAfterAgentCompleted = 0; // Reset counter while awaiting final
}
continue;
}
// Event received - reset timeout counter
// Event received - reset timeout counters
pollTimeoutsAfterAgentCompleted = 0;
pollTimeoutsWhileAwaitingFinal = 0;
event = item.getEvent();
LOGGER.debug("EventConsumer received event: {} (queue={})",
event.getClass().getSimpleName(), System.identityHashCode(queue));
Expand Down Expand Up @@ -179,10 +228,11 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
// the stream-end signal can reach the client BEFORE the buffered final event,
// causing the client to close the connection and never receive the final event.
// This is especially important in replicated scenarios where events arrive via Kafka
// and timing is less deterministic. A small delay ensures the buffer flushes.
// and timing is less deterministic. A delay ensures the buffer flushes.
// Increased to 150ms to account for CI environment latency and JVM scheduling delays.
if (isFinalSent) {
try {
Thread.sleep(50); // 50ms to allow SSE buffer flush
Thread.sleep(150); // 150ms to allow SSE buffer flush in CI environments
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using Thread.sleep() to mitigate a race condition is brittle. While increasing the delay to 150ms makes failure less likely, it doesn't eliminate the race condition where the stream completion signal might arrive before the final event is flushed. This can still cause intermittent failures.

Additionally, 150 is a magic number. It should be defined as a named constant (e.g., SSE_TUBE_FLUSH_DELAY_MS). Since this constant may be relevant to other components using SSE, consider placing it in a shared constants class or module to ensure consistency across the repository.

If the underlying stream library (mutiny.zero.Tube) provides a more deterministic way to ensure all sent items are flushed before completing the stream, that should be preferred over a fixed sleep.

References
  1. Constants that are used across multiple modules or components should be moved to a shared location to avoid duplication and ensure consistency.
  2. Stricter coding practices, such as avoiding magic numbers, are required for production code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The use of Thread.sleep(150) to manage buffer flushing is a fragile workaround that introduces a mandatory 150ms latency to the end of every successful stream, including blocking calls. This significantly impacts performance for fast agents. Furthermore, EventConsumer is a common component and should not contain transport-specific logic for 'SSE buffer flush'. A more robust solution would be to ensure the transport layer (e.g., MultiSseSupport) handles the graceful shutdown after the last write completion callback, rather than relying on a hardcoded delay in the event producer.

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand All @@ -198,15 +248,17 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
return;
}
Comment on lines 248 to 249
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In this error path, the completed flag is not set to true. This causes the finally block at line 251 to call tube.complete() after tube.fail(t) has already been invoked. Reactive stream specifications dictate that a stream should terminate with either an error or a completion signal, but not both. Setting completed = true here would prevent the redundant completion signal.

                        completed = true;
                        tube.fail(t);
                        return;

}
} finally {
if (!completed) {
LOGGER.debug("EventConsumer finally block: calling tube.complete() for queue {}", System.identityHashCode(queue));
tube.complete();
LOGGER.debug("EventConsumer finally block: tube.complete() returned for queue {}", System.identityHashCode(queue));
} else {
LOGGER.debug("EventConsumer finally block: completed=true, skipping tube.complete() for queue {}", System.identityHashCode(queue));
} finally {
if (!completed) {
LOGGER.debug("EventConsumer finally block: calling tube.complete() for queue {}", System.identityHashCode(queue));
tube.complete();
LOGGER.debug("EventConsumer finally block: tube.complete() returned for queue {}", System.identityHashCode(queue));
} else {
LOGGER.debug("EventConsumer finally block: completed=true, skipping tube.complete() for queue {}", System.identityHashCode(queue));
}
}
}
});
// Lambda returns immediately - polling continues on executor thread
});
}

Expand Down
45 changes: 45 additions & 0 deletions server-common/src/main/java/io/a2a/server/events/EventQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,36 @@ public void taskDone() {
*/
public abstract int size();

/**
* Returns whether this queue is awaiting a final event to be delivered.
* <p>
* This is used by EventConsumer to determine if it should keep polling even when
* the queue is empty. A final event may still be in-transit through MainEventBusProcessor.
* </p>
* <p>
* For MainQueue: always returns false (MainQueue cannot be consumed).
* For ChildQueue: returns true if {@link ChildQueue#expectFinalEvent()} was called
* but the final event hasn't been received yet.
* </p>
*
* @return true if awaiting a final event, false otherwise
*/
public boolean isAwaitingFinalEvent() {
// Default implementation - overridden by ChildQueue
return false;
}

/**
* Clears the awaiting final event flag.
* <p>
* Default implementation is a no-op for queues that don't track this state.
* ChildQueue overrides this to actually clear the flag.
* </p>
*/
public void clearAwaitingFinalEvent() {
// Default no-op implementation - overridden by ChildQueue
}

/**
* Closes this event queue gracefully, allowing pending events to be consumed.
*/
Expand Down Expand Up @@ -757,6 +787,11 @@ public int size() {
return queue.size();
}

@Override
public boolean isAwaitingFinalEvent() {
return awaitingFinalEvent;
}

@Override
public void awaitQueuePollerStart() throws InterruptedException {
parent.awaitQueuePollerStart();
Expand Down Expand Up @@ -790,6 +825,16 @@ void expectFinalEvent() {
LOGGER.debug("ChildQueue {} now awaiting final event", System.identityHashCode(this));
}

/**
* Called by EventConsumer when it has waited too long for the final event.
* This allows normal timeout logic to proceed if the final event never arrives.
*/
@Override
public void clearAwaitingFinalEvent() {
awaitingFinalEvent = false;
LOGGER.debug("ChildQueue {} cleared awaitingFinalEvent flag (timeout)", System.identityHashCode(this));
}

@Override
public void close() {
close(false);
Expand Down
Loading
Loading