Skip to content

Commit 7d292ee

Browse files
committed
Refactor ExternalProcessorFilter to use new FilterContext API.
Merged changes from sauravzg-feat-bootstrap-filter-context branch and updated ExternalProcessorFilter and its test to work with the new Filter.Provider.parseFilterConfig API which now takes a FilterContext containing BootstrapInfo and ServerInfo.
1 parent c940c80 commit 7d292ee

16 files changed

Lines changed: 109 additions & 102 deletions

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import io.grpc.ClientInterceptor;
2121
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
2222
import io.grpc.ForwardingClientCallListener;
23-
import io.grpc.ManagedChannel;
2423
import io.grpc.Metadata;
2524
import io.grpc.MethodDescriptor;
2625
import io.grpc.Status;
@@ -30,12 +29,10 @@
3029
import io.grpc.xds.internal.grpcservice.GrpcServiceConfig;
3130
import io.grpc.xds.internal.grpcservice.GrpcServiceConfigParser;
3231
import io.grpc.xds.internal.grpcservice.GrpcServiceParseException;
33-
import io.grpc.xds.internal.grpcservice.GrpcServiceXdsContextProvider;
3432
import io.grpc.xds.internal.grpcservice.HeaderValue;
3533
import java.io.ByteArrayInputStream;
3634
import java.io.IOException;
3735
import java.io.InputStream;
38-
import java.util.List;
3936
import java.util.Locale;
4037
import java.util.concurrent.Executor;
4138
import java.util.concurrent.ScheduledExecutorService;
@@ -53,7 +50,6 @@ public ExternalProcessorFilter(String name) {
5350
}
5451

5552
static final class Provider implements Filter.Provider {
56-
private GrpcServiceXdsContextProvider grpcServiceXdsContextProvider;
5753
@Override
5854
public String[] typeUrls() {
5955
return new String[]{TYPE_URL};
@@ -65,13 +61,13 @@ public boolean isClientFilter() {
6561
}
6662

6763
@Override
68-
public ExternalProcessorFilter newInstance(String name, GrpcServiceXdsContextProvider grpcServiceXdsContextProvider) {
69-
this.grpcServiceXdsContextProvider = grpcServiceXdsContextProvider;
64+
public ExternalProcessorFilter newInstance(String name) {
7065
return new ExternalProcessorFilter(name);
7166
}
7267

7368
@Override
74-
public ConfigOrError<ExternalProcessorFilterConfig> parseFilterConfig(Message rawProtoMessage) {
69+
public ConfigOrError<ExternalProcessorFilterConfig> parseFilterConfig(
70+
Message rawProtoMessage, FilterContext context) {
7571
if (!(rawProtoMessage instanceof Any)) {
7672
return ConfigOrError.fromError("Invalid config type: " + rawProtoMessage.getClass());
7773
}
@@ -91,16 +87,18 @@ public ConfigOrError<ExternalProcessorFilterConfig> parseFilterConfig(Message ra
9187
}
9288

9389
try {
94-
GrpcServiceConfig grpcServiceConfig = GrpcServiceConfigParser.parse(externalProcessor.getGrpcService(), grpcServiceXdsContextProvider);
90+
GrpcServiceConfig grpcServiceConfig = GrpcServiceConfigParser.parse(
91+
externalProcessor.getGrpcService(), context.bootstrapInfo(), context.serverInfo());
9592
return ConfigOrError.fromConfig(new ExternalProcessorFilterConfig(externalProcessor, grpcServiceConfig));
9693
} catch (GrpcServiceParseException e) {
9794
return ConfigOrError.fromError("Error parsing GrpcService config: " + e.getMessage());
9895
}
9996
}
10097

10198
@Override
102-
public ConfigOrError<? extends FilterConfig> parseFilterConfigOverride(Message rawProtoMessage) {
103-
return parseFilterConfig(rawProtoMessage);
99+
public ConfigOrError<? extends FilterConfig> parseFilterConfigOverride(
100+
Message rawProtoMessage, FilterContext context) {
101+
return parseFilterConfig(rawProtoMessage, context);
104102
}
105103
}
106104

@@ -487,11 +485,7 @@ else if (response.hasResponseBody()) {
487485
extProcClientCallRequestObserver.onCompleted();
488486
}
489487
}
490-
}
491-
// For robustness. For any internal processing failure make sure the internal state
492-
// machine is notified and the dataplane call is properly cancelled (or failed-open if
493-
// configured)
494-
catch (Throwable t) {
488+
} catch (Throwable t) {
495489
onError(t);
496490
}
497491
}

xds/src/main/java/io/grpc/xds/FaultFilter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import io.grpc.xds.FaultConfig.FaultAbort;
4747
import io.grpc.xds.FaultConfig.FaultDelay;
4848
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
49-
import io.grpc.xds.internal.grpcservice.GrpcServiceXdsContextProvider;
5049
import java.util.Locale;
5150
import java.util.concurrent.Executor;
5251
import java.util.concurrent.ScheduledExecutorService;
@@ -100,7 +99,7 @@ public boolean isClientFilter() {
10099
}
101100

102101
@Override
103-
public FaultFilter newInstance(String name, GrpcServiceXdsContextProvider grpcServiceXdsContextProvider) {
102+
public FaultFilter newInstance(String name) {
104103
return INSTANCE;
105104
}
106105

xds/src/main/java/io/grpc/xds/GcpAuthenticationFilter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
4646
import io.grpc.xds.XdsConfig.XdsClusterConfig;
4747
import io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
48-
import io.grpc.xds.internal.grpcservice.GrpcServiceXdsContextProvider;
4948
import java.util.LinkedHashMap;
5049
import java.util.Map;
5150
import java.util.concurrent.ScheduledExecutorService;
@@ -82,7 +81,7 @@ public boolean isClientFilter() {
8281
}
8382

8483
@Override
85-
public GcpAuthenticationFilter newInstance(String name, GrpcServiceXdsContextProvider grpcServiceXdsContextProvider) {
84+
public GcpAuthenticationFilter newInstance(String name) {
8685
return new GcpAuthenticationFilter(name, cacheSize);
8786
}
8887

xds/src/main/java/io/grpc/xds/InternalRbacFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public static ServerInterceptor createInterceptor(RBAC rbac) {
3333
throw new IllegalArgumentException(
3434
String.format("Failed to parse Rbac policy: %s", filterConfig.errorDetail));
3535
}
36-
return new RbacFilter.Provider().newInstance("internalRbacFilter", null)
36+
return new RbacFilter.Provider().newInstance("internalRbacFilter")
3737
.buildServerInterceptor(filterConfig.config, null);
3838
}
3939
}

xds/src/main/java/io/grpc/xds/RbacFilter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import io.grpc.Status;
3636
import io.grpc.xds.internal.MatcherParser;
3737
import io.grpc.xds.internal.Matchers;
38-
import io.grpc.xds.internal.grpcservice.GrpcServiceXdsContextProvider;
3938
import io.grpc.xds.internal.rbac.engine.GrpcAuthorizationEngine;
4039
import io.grpc.xds.internal.rbac.engine.GrpcAuthorizationEngine.AlwaysTrueMatcher;
4140
import io.grpc.xds.internal.rbac.engine.GrpcAuthorizationEngine.AndMatcher;
@@ -90,7 +89,7 @@ public boolean isServerFilter() {
9089
}
9190

9291
@Override
93-
public RbacFilter newInstance(String name, GrpcServiceXdsContextProvider grpcServiceXdsContextProvider) {
92+
public RbacFilter newInstance(String name) {
9493
return INSTANCE;
9594
}
9695

xds/src/main/java/io/grpc/xds/RouterFilter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package io.grpc.xds;
1818

1919
import com.google.protobuf.Message;
20-
import io.grpc.xds.internal.grpcservice.GrpcServiceXdsContextProvider;
2120

2221
/**
2322
* Router filter implementation. Currently this filter does not parse any field in the config.
@@ -57,7 +56,7 @@ public boolean isServerFilter() {
5756
}
5857

5958
@Override
60-
public RouterFilter newInstance(String name, GrpcServiceXdsContextProvider grpcServiceXdsContextProvider) {
59+
public RouterFilter newInstance(String name) {
6160
return INSTANCE;
6261
}
6362

xds/src/main/java/io/grpc/xds/XdsNameResolver.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import io.grpc.xds.client.XdsInitializationException;
6868
import io.grpc.xds.client.XdsLogger;
6969
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
70+
import java.net.URI;
7071
import java.util.ArrayList;
7172
import java.util.Collections;
7273
import java.util.HashMap;
@@ -109,6 +110,7 @@ final class XdsNameResolver extends NameResolver {
109110
private final XdsLogger logger;
110111
@Nullable
111112
private final String targetAuthority;
113+
private final String target;
112114
private final String serviceAuthority;
113115
// Encoded version of the service authority as per
114116
// https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
@@ -139,12 +141,12 @@ final class XdsNameResolver extends NameResolver {
139141
private ResolveState resolveState;
140142

141143
XdsNameResolver(
142-
String target, @Nullable String targetAuthority, String name,
143-
@Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser,
144+
URI targetUri, String name, @Nullable String overrideAuthority,
145+
ServiceConfigParser serviceConfigParser,
144146
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
145147
@Nullable Map<String, ?> bootstrapOverride,
146148
MetricRecorder metricRecorder, Args nameResolverArgs) {
147-
this(target, targetAuthority, name, overrideAuthority, serviceConfigParser,
149+
this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser,
148150
syncContext, scheduler,
149151
bootstrapOverride == null
150152
? SharedXdsClientPoolProvider.getDefaultProvider()
@@ -155,13 +157,14 @@ final class XdsNameResolver extends NameResolver {
155157

156158
@VisibleForTesting
157159
XdsNameResolver(
158-
String target, @Nullable String targetAuthority, String name,
160+
URI targetUri, @Nullable String targetAuthority, String name,
159161
@Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser,
160162
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
161163
XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
162164
FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride,
163165
MetricRecorder metricRecorder, Args nameResolverArgs) {
164166
this.targetAuthority = targetAuthority;
167+
target = targetUri.toString();
165168

166169
// The name might have multiple slashes so encode it before verifying.
167170
serviceAuthority = checkNotNull(name, "name");
@@ -733,7 +736,7 @@ private void updateActiveFilters(@Nullable List<NamedFilterConfig> filterConfigs
733736
Filter.Provider provider = filterRegistry.get(typeUrl);
734737
checkNotNull(provider, "provider %s", typeUrl);
735738
Filter filter = activeFilters.computeIfAbsent(
736-
filterKey, k -> provider.newInstance(namedFilter.name, null));
739+
filterKey, k -> provider.newInstance(namedFilter.name));
737740
checkNotNull(filter, "filter %s", filterKey);
738741
filtersToShutdown.remove(filterKey);
739742
}
@@ -875,7 +878,6 @@ private ClientInterceptor createFilters(
875878
}
876879

877880
ImmutableList.Builder<ClientInterceptor> filterInterceptors = ImmutableList.builder();
878-
ClientInterceptor extProcInterceptor = null;
879881
for (NamedFilterConfig namedFilter : filterConfigs) {
880882
String name = namedFilter.name;
881883
FilterConfig config = namedFilter.filterConfig;
@@ -888,18 +890,10 @@ private ClientInterceptor createFilters(
888890
filter.buildClientInterceptor(config, overrideConfig, scheduler);
889891

890892
if (interceptor != null) {
891-
if (config.typeUrl().equals(ExternalProcessorFilter.TYPE_URL)) {
892-
extProcInterceptor = interceptor;
893-
} else {
894-
filterInterceptors.add(interceptor);
895-
}
893+
filterInterceptors.add(interceptor);
896894
}
897895
}
898896

899-
if (extProcInterceptor != null) {
900-
filterInterceptors.add(extProcInterceptor);
901-
}
902-
903897
// Combine interceptors produced by different filters into a single one that executes
904898
// them sequentially. The order is preserved.
905899
return combineInterceptors(filterInterceptors.build());

xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.grpc.Internal;
2323
import io.grpc.NameResolver.Args;
2424
import io.grpc.NameResolverProvider;
25-
import io.grpc.Uri;
2625
import io.grpc.xds.client.XdsClient;
2726
import java.net.InetSocketAddress;
2827
import java.net.SocketAddress;
@@ -87,39 +86,16 @@ public XdsNameResolver newNameResolver(URI targetUri, Args args) {
8786
targetPath,
8887
targetUri);
8988
String name = targetPath.substring(1);
90-
return newNameResolver(targetUri.toString(), targetUri.getAuthority(), name, args);
89+
return new XdsNameResolver(
90+
targetUri, name, args.getOverrideAuthority(),
91+
args.getServiceConfigParser(), args.getSynchronizationContext(),
92+
args.getScheduledExecutorService(),
93+
bootstrapOverride,
94+
args.getMetricRecorder(), args);
9195
}
9296
return null;
9397
}
9498

95-
@Override
96-
public XdsNameResolver newNameResolver(Uri targetUri, Args args) {
97-
if (scheme.equals(targetUri.getScheme())) {
98-
Preconditions.checkArgument(
99-
targetUri.isPathAbsolute(),
100-
"the path component of the target (%s) must start with '/'",
101-
targetUri);
102-
return newNameResolver(
103-
targetUri.toString(), targetUri.getAuthority(), targetUri.getPath().substring(1), args);
104-
}
105-
return null;
106-
}
107-
108-
private XdsNameResolver newNameResolver(
109-
String targetUri, String targetAuthority, String name, Args args) {
110-
return new XdsNameResolver(
111-
targetUri.toString(),
112-
targetAuthority,
113-
name,
114-
args.getOverrideAuthority(),
115-
args.getServiceConfigParser(),
116-
args.getSynchronizationContext(),
117-
args.getScheduledExecutorService(),
118-
bootstrapOverride,
119-
args.getMetricRecorder(),
120-
args);
121-
}
122-
12399
@Override
124100
public String getDefaultScheme() {
125101
return scheme;

xds/src/main/java/io/grpc/xds/XdsServerWrapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -612,7 +612,7 @@ private void updateActiveFiltersForChain(
612612
Filter.Provider provider = filterRegistry.get(typeUrl);
613613
checkNotNull(provider, "provider %s", typeUrl);
614614
Filter filter = chainFilters.computeIfAbsent(
615-
filterKey, k -> provider.newInstance(namedFilter.name, null));
615+
filterKey, k -> provider.newInstance(namedFilter.name));
616616
checkNotNull(filter, "filter %s", filterKey);
617617
filtersToShutdown.remove(filterKey);
618618
}

0 commit comments

Comments
 (0)