Skip to content

Commit a549c8f

Browse files
committed
Fix missing coordinated synchronizations between threads.
1 parent ea06798 commit a549c8f

1 file changed

Lines changed: 21 additions & 13 deletions

File tree

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -361,10 +361,12 @@ protected ExtProcClientCall(ClientCall<InputStream, InputStream> delegate,
361361
}
362362

363363
private void sendToDataPlane(Runnable action) {
364-
if (headersSent) {
365-
action.run();
366-
} else {
367-
pendingActions.add(action);
364+
synchronized (lock) {
365+
if (headersSent) {
366+
action.run();
367+
} else {
368+
pendingActions.add(action);
369+
}
368370
}
369371
}
370372

@@ -400,9 +402,11 @@ public void onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingResponse re
400402
if (response.getRequestHeaders().hasResponse()) {
401403
applyHeaderMutations(requestHeaders, response.getRequestHeaders().getResponse().getHeaderMutation());
402404
}
403-
headersSent = true;
404-
delegate().start(wrappedListener, requestHeaders);
405-
drainQueue();
405+
synchronized (lock) {
406+
headersSent = true;
407+
delegate().start(wrappedListener, requestHeaders);
408+
drainQueue();
409+
}
406410
}
407411
// 2. Client Message (Request Body)
408412
else if (response.hasRequestBody()) {
@@ -496,8 +500,10 @@ public void onCompleted() {
496500
}
497501

498502
if (config.getObservabilityMode()) {
499-
headersSent = true;
500-
delegate().start(wrappedListener, headers);
503+
synchronized (lock) {
504+
headersSent = true;
505+
delegate().start(wrappedListener, headers);
506+
}
501507
}
502508
}
503509

@@ -632,12 +638,14 @@ private void handleFailOpen(ExtProcListener listener) {
632638
if (extProcStreamCompleted.compareAndSet(false, true)) {
633639
// The ext_proc stream is gone. "Fail open" means we proceed with the RPC
634640
// without any more processing.
635-
if (!headersSent) {
636-
headersSent = true;
637-
delegate().start(listener, requestHeaders);
641+
synchronized (lock) {
642+
if (!headersSent) {
643+
headersSent = true;
644+
delegate().start(listener, requestHeaders);
645+
}
646+
drainQueue();
638647
}
639648
listener.unblockAfterStreamComplete();
640-
drainQueue();
641649
}
642650
}
643651
}

0 commit comments

Comments
 (0)