2525import io .grpc .MethodDescriptor ;
2626import io .grpc .Status ;
2727import io .grpc .stub .ClientCallStreamObserver ;
28+ import io .grpc .stub .ClientResponseObserver ;
2829import io .grpc .xds .internal .grpcservice .CachedChannelManager ;
2930import io .grpc .xds .internal .grpcservice .GrpcServiceConfig ;
3031import io .grpc .xds .internal .grpcservice .GrpcServiceConfigParser ;
3536import java .io .IOException ;
3637import java .io .InputStream ;
3738import java .util .List ;
39+ import java .util .Locale ;
3840import java .util .concurrent .Executor ;
3941import java .util .concurrent .ScheduledExecutorService ;
4042import java .util .concurrent .TimeUnit ;
@@ -156,16 +158,15 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
156158 MethodDescriptor <ReqT , RespT > method ,
157159 CallOptions callOptions ,
158160 Channel next ) {
159- Executor callExecutor = callOptions .getExecutor ();
160161 ExternalProcessorGrpc .ExternalProcessorStub stub = ExternalProcessorGrpc .newStub (
161162 cachedChannelManager .getChannel (filterConfig .grpcServiceConfig ))
162- .withExecutor (callExecutor );
163+ .withExecutor (callOptions . getExecutor () );
163164
164165 if (filterConfig .grpcServiceConfig .timeout () != null && filterConfig .grpcServiceConfig .timeout ().isPresent ()) {
165- long timeoutNanos = filterConfig .grpcServiceConfig .timeout ().get ().getSeconds () * 1_000_000_000L
166- + filterConfig .grpcServiceConfig .timeout ().get ().getNano ();
167- if (timeoutNanos > 0 ) {
168- stub = stub .withDeadlineAfter (timeoutNanos , TimeUnit .NANOSECONDS );
166+ long timeoutSeconds = filterConfig .grpcServiceConfig .timeout ().get ().getSeconds ();
167+ int timeoutNanos = filterConfig .grpcServiceConfig .timeout ().get ().getNano ();
168+ if (timeoutSeconds > 0 || timeoutNanos > 0 ) {
169+ stub = stub .withDeadlineAfter (timeoutSeconds * 1_000_000_000L + timeoutNanos , TimeUnit .NANOSECONDS );
169170 }
170171 }
171172
@@ -207,7 +208,7 @@ public void start(Listener<ExtRespT> responseListener, Metadata headers) {
207208 // Create a local subclass instance to buffer outbound actions
208209 ExtProcDelayedCall <InputStream , InputStream > delayedCall =
209210 new ExtProcDelayedCall <>(
210- callExecutor , scheduler , callOptions .getDeadline ());
211+ callOptions . getExecutor () , scheduler , callOptions .getDeadline ());
211212
212213 ExtProcClientCall extProcCall = new ExtProcClientCall (delayedCall , rawCall , stub , config );
213214
@@ -283,14 +284,17 @@ private static io.envoyproxy.envoy.config.core.v3.HeaderMap toHeaderMap(Metadata
283284 // Skip binary headers for this basic mapping
284285 if (key .endsWith (Metadata .BINARY_HEADER_SUFFIX )) {
285286 Metadata .Key <byte []> binKey = Metadata .Key .of (key , Metadata .BINARY_BYTE_MARSHALLER );
286- for (byte [] binValue : metadata .getAll (binKey )) {
287- String encoded = com .google .common .io .BaseEncoding .base64 ().encode (binValue );
288- io .envoyproxy .envoy .config .core .v3 .HeaderValue headerValue =
289- io .envoyproxy .envoy .config .core .v3 .HeaderValue .newBuilder ()
290- .setKey (key .toLowerCase ())
291- .setValue (encoded )
292- .build ();
293- builder .addHeaders (headerValue );
287+ Iterable <byte []> values = metadata .getAll (binKey );
288+ if (values != null ) {
289+ for (byte [] binValue : values ) {
290+ String encoded = com .google .common .io .BaseEncoding .base64 ().encode (binValue );
291+ io .envoyproxy .envoy .config .core .v3 .HeaderValue headerValue =
292+ io .envoyproxy .envoy .config .core .v3 .HeaderValue .newBuilder ()
293+ .setKey (key .toLowerCase (Locale .ROOT ))
294+ .setValue (encoded )
295+ .build ();
296+ builder .addHeaders (headerValue );
297+ }
294298 }
295299 } else {
296300 Metadata .Key <String > asciiKey = Metadata .Key .of (key , Metadata .ASCII_STRING_MARSHALLER );
@@ -299,7 +303,7 @@ private static io.envoyproxy.envoy.config.core.v3.HeaderMap toHeaderMap(Metadata
299303 for (String value : values ) {
300304 io .envoyproxy .envoy .config .core .v3 .HeaderValue headerValue =
301305 io .envoyproxy .envoy .config .core .v3 .HeaderValue .newBuilder ()
302- .setKey (key .toLowerCase ())
306+ .setKey (key .toLowerCase (Locale . ROOT ))
303307 .setValue (value )
304308 .build ();
305309 builder .addHeaders (headerValue );
@@ -310,34 +314,27 @@ private static io.envoyproxy.envoy.config.core.v3.HeaderMap toHeaderMap(Metadata
310314 return builder .build ();
311315 }
312316
313- private static void applyHeaderMutations (Metadata headers , io .envoyproxy .envoy .service .ext_proc .v3 .HeaderMutation mutation ) {
314- for (io .envoyproxy .envoy .config .core .v3 .HeaderValueOption opt : mutation .getSetHeadersList ()) {
315- String keyStr = opt .getHeader ().getKey ().toLowerCase ();
316- String valueStr = opt .getHeader ().getValue ();
317- boolean isBinary = keyStr .endsWith (Metadata .BINARY_HEADER_SUFFIX );
318-
319- if (isBinary ) {
320- Metadata .Key <byte []> key = Metadata .Key .of (keyStr , Metadata .BINARY_BYTE_MARSHALLER );
321- if (!opt .getAppend ().getValue ()) {
322- headers .discardAll (key );
323- }
324- byte [] decodedValue = com .google .common .io .BaseEncoding .base64 ().decode (valueStr );
325- headers .put (key , decodedValue );
326- } else {
327- Metadata .Key <String > key = Metadata .Key .of (keyStr , Metadata .ASCII_STRING_MARSHALLER );
328- if (!opt .getAppend ().getValue ()) {
329- headers .discardAll (key );
317+ private static void applyHeaderMutations (Metadata metadata , io .envoyproxy .envoy .service .ext_proc .v3 .HeaderMutation mutation ) {
318+ for (io .envoyproxy .envoy .config .core .v3 .HeaderValueOption setHeader : mutation .getSetHeadersList ()) {
319+ String key = setHeader .getHeader ().getKey ();
320+ String value = setHeader .getHeader ().getValue ();
321+ try {
322+ Metadata .Key <String > metadataKey = Metadata .Key .of (key , Metadata .ASCII_STRING_MARSHALLER );
323+ if (setHeader .getAppendAction () == io .envoyproxy .envoy .config .core .v3 .HeaderValueOption .HeaderAppendAction .APPEND_IF_EXISTS_OR_ADD
324+ || setHeader .getAppendAction () == io .envoyproxy .envoy .config .core .v3 .HeaderValueOption .HeaderAppendAction .OVERWRITE_IF_EXISTS_OR_ADD ) {
325+ metadata .removeAll (metadataKey );
330326 }
331- headers .put (key , valueStr );
327+ metadata .put (metadataKey , value );
328+ } catch (IllegalArgumentException e ) {
329+ // Skip
332330 }
333331 }
334-
335- for (String keyToRemove : mutation .getRemoveHeadersList ()) {
336- String lowKey = keyToRemove .toLowerCase ();
337- if (lowKey .endsWith (Metadata .BINARY_HEADER_SUFFIX )) {
338- headers .discardAll (Metadata .Key .of (lowKey , Metadata .BINARY_BYTE_MARSHALLER ));
339- } else {
340- headers .discardAll (Metadata .Key .of (lowKey , Metadata .ASCII_STRING_MARSHALLER ));
332+ for (String removeHeader : mutation .getRemoveHeadersList ()) {
333+ try {
334+ Metadata .Key <String > metadataKey = Metadata .Key .of (removeHeader , Metadata .ASCII_STRING_MARSHALLER );
335+ metadata .removeAll (metadataKey );
336+ } catch (IllegalArgumentException e ) {
337+ // Skip
341338 }
342339 }
343340 }
@@ -361,13 +358,14 @@ private static class ExtProcClientCall extends SimpleForwardingClientCall<InputS
361358 private final ClientCall <InputStream , InputStream > rawCall ;
362359 private final ExtProcDelayedCall <InputStream , InputStream > delayedCall ;
363360 private final Object streamLock = new Object ();
364- private ClientCallStreamObserver <ProcessingRequest > extProcClientCallRequestObserver ;
361+ private io . grpc . stub . ClientCallStreamObserver <ProcessingRequest > extProcClientCallRequestObserver ;
365362 private ExtProcListener wrappedListener ;
366363
367364 private Metadata requestHeaders ;
368365 final AtomicBoolean extProcStreamFailed = new AtomicBoolean (false );
369366 final AtomicBoolean extProcStreamCompleted = new AtomicBoolean (false );
370367 final AtomicBoolean drainingExtProcStream = new AtomicBoolean (false );
368+ final AtomicBoolean halfClosed = new AtomicBoolean (false );
371369
372370 protected ExtProcClientCall (
373371 ExtProcDelayedCall <InputStream , InputStream > delayedCall ,
@@ -396,87 +394,106 @@ public void start(Listener<InputStream> responseListener, Metadata headers) {
396394 // DelayedClientCall.start will buffer the listener and headers until setCall is called.
397395 super .start (wrappedListener , headers );
398396
399- extProcClientCallRequestObserver = ( ClientCallStreamObserver < ProcessingRequest >) stub .process (new io . grpc . stub . StreamObserver < ProcessingResponse >() {
397+ stub .process (new ClientResponseObserver < ProcessingRequest , ProcessingResponse >() {
400398 @ Override
401- public void onNext (ProcessingResponse response ) {
402- if (response .hasImmediateResponse ()) {
403- handleImmediateResponse (response .getImmediateResponse (), responseListener );
404- return ;
405- }
406-
407- if (config .getObservabilityMode ()) {
408- return ;
409- }
399+ public void beforeStart (ClientCallStreamObserver <ProcessingRequest > requestStream ) {
400+ extProcClientCallRequestObserver = requestStream ;
401+ }
410402
411- if (response .getRequestDrain ()) {
412- drainingExtProcStream .set (true );
413- synchronized (streamLock ) {
414- extProcClientCallRequestObserver .onCompleted ();
403+ @ Override
404+ public void onNext (ProcessingResponse response ) {
405+ try {
406+ if (response .hasImmediateResponse ()) {
407+ handleImmediateResponse (response .getImmediateResponse (), responseListener );
408+ return ;
415409 }
416- return ;
417- }
418410
419- // 1. Client Headers
420- if (response .hasRequestHeaders ()) {
421- if (response .getRequestHeaders ().hasResponse ()) {
422- applyHeaderMutations (requestHeaders , response .getRequestHeaders ().getResponse ().getHeaderMutation ());
411+ if (config .getObservabilityMode ()) {
412+ return ;
423413 }
424- activateCall ();
425- }
426- // 2. Client Message (Request Body)
427- else if (response .hasRequestBody ()) {
428- if (response .getRequestBody ().hasResponse ()
429- && response .getRequestBody ().getResponse ().hasBodyMutation ()
430- && response .getRequestBody ().getResponse ().getBodyMutation ().hasStreamedResponse ()
431- && response .getRequestBody ().getResponse ().getBodyMutation ().getStreamedResponse ().getGrpcMessageCompressed ()) {
432- io .grpc .StatusRuntimeException ex = io .grpc .Status .INTERNAL
433- .withDescription ("gRPC message compression not supported in ext_proc" )
434- .asRuntimeException ();
414+
415+ if (response .getRequestDrain ()) {
416+ drainingExtProcStream .set (true );
435417 synchronized (streamLock ) {
436- extProcClientCallRequestObserver .onError ( ex );
418+ extProcClientCallRequestObserver .onCompleted ( );
437419 }
438- onError (ex );
439420 return ;
440421 }
441- handleRequestBodyResponse (response .getRequestBody ());
442- }
443- // 4. Server Headers
444- else if (response .hasResponseHeaders ()) {
445- if (response .getResponseHeaders ().hasResponse ()) {
446- applyHeaderMutations (wrappedListener .savedHeaders , response .getResponseHeaders ().getResponse ().getHeaderMutation ());
422+
423+ // 1. Client Headers
424+ if (response .hasRequestHeaders ()) {
425+ if (response .getRequestHeaders ().hasResponse ()) {
426+ applyHeaderMutations (requestHeaders , response .getRequestHeaders ().getResponse ().getHeaderMutation ());
427+ }
428+ activateCall ();
447429 }
448- wrappedListener .proceedWithHeaders ();
449- }
450- // 5. Server Message (Response Body)
451- else if (response .hasResponseBody ()) {
452- if (response .getResponseBody ().hasResponse ()
453- && response .getResponseBody ().getResponse ().hasBodyMutation ()
454- && response .getResponseBody ().getResponse ().getBodyMutation ().hasStreamedResponse ()
455- && response .getResponseBody ().getResponse ().getBodyMutation ().getStreamedResponse ().getGrpcMessageCompressed ()) {
456- io .grpc .StatusRuntimeException ex = io .grpc .Status .INTERNAL
457- .withDescription ("gRPC message compression not supported in ext_proc" )
458- .asRuntimeException ();
459- synchronized (streamLock ) {
460- extProcClientCallRequestObserver .onError (ex );
430+ // 2. Client Message (Request Body)
431+ else if (response .hasRequestBody ()) {
432+ if (response .getRequestBody ().hasResponse ()
433+ && response .getRequestBody ().getResponse ().hasBodyMutation ()) {
434+ io .envoyproxy .envoy .service .ext_proc .v3 .BodyMutation mutation =
435+ response .getRequestBody ().getResponse ().getBodyMutation ();
436+ if (mutation .hasStreamedResponse ()
437+ && mutation .getStreamedResponse ().getGrpcMessageCompressed ()) {
438+ io .grpc .StatusRuntimeException ex = io .grpc .Status .INTERNAL
439+ .withDescription ("gRPC message compression not supported in ext_proc" )
440+ .asRuntimeException ();
441+ synchronized (streamLock ) {
442+ extProcClientCallRequestObserver .onError (ex );
443+ }
444+ onError (ex );
445+ return ;
446+ }
461447 }
462- onError (ex );
463- return ;
448+ handleRequestBodyResponse (response .getRequestBody ());
464449 }
465- handleResponseBodyResponse (response .getResponseBody (), wrappedListener );
466- }
467- // 6. Response Trailers
468- if (response .hasResponseTrailers ()) {
469- if (response .getResponseTrailers ().hasHeaderMutation ()) {
470- applyHeaderMutations (
471- wrappedListener .savedTrailers ,
472- response .getResponseTrailers ().getHeaderMutation ()
473- );
450+ // 4. Server Headers
451+ else if (response .hasResponseHeaders ()) {
452+ if (response .getResponseHeaders ().hasResponse ()) {
453+ applyHeaderMutations (wrappedListener .savedHeaders , response .getResponseHeaders ().getResponse ().getHeaderMutation ());
454+ }
455+ wrappedListener .proceedWithHeaders ();
474456 }
475- wrappedListener .proceedWithClose ();
476- synchronized (streamLock ) {
477- extProcClientCallRequestObserver .onCompleted ();
457+ // 5. Server Message (Response Body)
458+ else if (response .hasResponseBody ()) {
459+ if (response .getResponseBody ().hasResponse ()
460+ && response .getResponseBody ().getResponse ().hasBodyMutation ()) {
461+ io .envoyproxy .envoy .service .ext_proc .v3 .BodyMutation mutation =
462+ response .getResponseBody ().getResponse ().getBodyMutation ();
463+ if (mutation .hasStreamedResponse ()
464+ && mutation .getStreamedResponse ().getGrpcMessageCompressed ()) {
465+ io .grpc .StatusRuntimeException ex = io .grpc .Status .INTERNAL
466+ .withDescription ("gRPC message compression not supported in ext_proc" )
467+ .asRuntimeException ();
468+ synchronized (streamLock ) {
469+ extProcClientCallRequestObserver .onError (ex );
470+ }
471+ onError (ex );
472+ return ;
473+ }
474+ }
475+ handleResponseBodyResponse (response .getResponseBody (), wrappedListener );
476+ }
477+ // 6. Response Trailers
478+ if (response .hasResponseTrailers ()) {
479+ if (response .getResponseTrailers ().hasHeaderMutation ()) {
480+ applyHeaderMutations (
481+ wrappedListener .savedTrailers ,
482+ response .getResponseTrailers ().getHeaderMutation ()
483+ );
484+ }
485+ wrappedListener .proceedWithClose ();
486+ synchronized (streamLock ) {
487+ extProcClientCallRequestObserver .onCompleted ();
488+ }
478489 }
479490 }
491+ // For robustness. For any internal processing failure make sure the internal state
492+ // machine is notified and the dataplane call is properly cancelled (or failed-open if
493+ // configured)
494+ catch (Throwable t ) {
495+ onError (t );
496+ }
480497 }
481498
482499 @ Override
@@ -501,8 +518,8 @@ public void onCompleted() {
501518 extProcClientCallRequestObserver .setOnReadyHandler (this ::onExtProcStreamReady );
502519 }
503520
504- wrappedListener . setStream ( extProcClientCallRequestObserver );
505-
521+ // Send initial request headers. This is safe here because stub.process()
522+ // has started the call.
506523 synchronized (streamLock ) {
507524 extProcClientCallRequestObserver .onNext (ProcessingRequest .newBuilder ()
508525 .setRequestHeaders (io .envoyproxy .envoy .service .ext_proc .v3 .HttpHeaders .newBuilder ()
@@ -577,6 +594,7 @@ public void sendMessage(InputStream message) {
577594
578595 @ Override
579596 public void halfClose () {
597+ halfClosed .set (true );
580598 if (extProcStreamCompleted .get ()) {
581599 super .halfClose ();
582600 return ;
@@ -609,10 +627,14 @@ private void handleRequestBodyResponse(io.envoyproxy.envoy.service.ext_proc.v3.B
609627 if (bodyResponse .hasResponse () && bodyResponse .getResponse ().hasBodyMutation ()) {
610628 io .envoyproxy .envoy .service .ext_proc .v3 .BodyMutation mutation = bodyResponse .getResponse ().getBodyMutation ();
611629 if (mutation .hasBody () && !mutation .getBody ().isEmpty ()) {
612- byte [] mutatedBody = mutation .getBody ().toByteArray ();
613- super .sendMessage (new ByteArrayInputStream (mutatedBody ));
630+ if (!halfClosed .get ()) {
631+ byte [] mutatedBody = mutation .getBody ().toByteArray ();
632+ super .sendMessage (new ByteArrayInputStream (mutatedBody ));
633+ }
614634 } else if (mutation .getClearBody ()) {
615- super .sendMessage (new ByteArrayInputStream (new byte [0 ]));
635+ if (!halfClosed .get ()) {
636+ super .sendMessage (new ByteArrayInputStream (new byte [0 ]));
637+ }
616638 }
617639 }
618640 }
@@ -648,7 +670,6 @@ private void handleFailOpen(ExtProcListener listener) {
648670 private static class ExtProcListener extends ForwardingClientCallListener .SimpleForwardingClientCallListener <InputStream > {
649671 private final ClientCall <?, ?> rawCall ;
650672 private final ExtProcClientCall extProcClientCall ;
651- private ClientCallStreamObserver <ProcessingRequest > stream ;
652673 private Metadata savedHeaders ;
653674 private Metadata savedTrailers ;
654675 private io .grpc .Status savedStatus ;
@@ -660,8 +681,6 @@ protected ExtProcListener(ClientCall.Listener<InputStream> delegate, ClientCall<
660681 this .extProcClientCall = extProcClientCall ;
661682 }
662683
663- void setStream (ClientCallStreamObserver <ProcessingRequest > stream ) { this .stream = stream ; }
664-
665684 @ Override
666685 public void onReady () {
667686 if (extProcClientCall .drainingExtProcStream .get ()) {
0 commit comments