diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index a7100e97f0e..7dd6416a15e 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -27,6 +27,7 @@ conscrypt = "org.conscrypt:conscrypt-openjdk-uber:2.5.2" # Cronet (API and Embedded) upgrade is blocked by https://github.com/grpc/grpc-java/issues/10396. cronet-api = "org.chromium.net:cronet-api:108.5359.79" cronet-embedded = "org.chromium.net:cronet-embedded:108.5359.79" +dev-cel = "dev.cel:cel:0.4.1" errorprone-annotations = "com.google.errorprone:error_prone_annotations:2.23.0" errorprone-core = "com.google.errorprone:error_prone_core:2.23.0" google-api-protos = "com.google.api.grpc:proto-google-common-protos:2.29.0" diff --git a/xds/build.gradle b/xds/build.gradle index 71c2ef72274..f9e864536e0 100644 --- a/xds/build.gradle +++ b/xds/build.gradle @@ -53,6 +53,7 @@ dependencies { project(':grpc-services'), project(':grpc-auth'), project(path: ':grpc-alts', configuration: 'shadow'), + libraries.dev.cel, libraries.gson, libraries.re2j, libraries.auto.value.annotations, diff --git a/xds/src/main/java/io/grpc/xds/Filter.java b/xds/src/main/java/io/grpc/xds/Filter.java index 4b2767687f3..9383f6df63a 100644 --- a/xds/src/main/java/io/grpc/xds/Filter.java +++ b/xds/src/main/java/io/grpc/xds/Filter.java @@ -50,6 +50,12 @@ interface Filter { */ ConfigOrError parseFilterConfigOverride(Message rawProtoMessage); + default void shutdown() { + // Implement as needed. + // TODO(sergiitk): [DESIGN] important to cover and discuss in the design. + // TODO(sergiitk): [QUESTION] should it be in ServerInterceptorBuilder? + } + /** Represents an opaque data structure holding configuration for a filter. */ interface FilterConfig { String typeUrl(); @@ -68,6 +74,15 @@ interface ServerInterceptorBuilder { @Nullable ServerInterceptor buildServerInterceptor( FilterConfig config, @Nullable FilterConfig overrideConfig); + + @Nullable + default ServerInterceptor buildServerInterceptor( + FilterConfig config, + @Nullable FilterConfig overrideConfig, + ScheduledExecutorService scheduler) { + return buildServerInterceptor(config, overrideConfig); + } + } /** Filter config with instance name. */ diff --git a/xds/src/main/java/io/grpc/xds/FilterRegistry.java b/xds/src/main/java/io/grpc/xds/FilterRegistry.java index 7f1fe82c6c3..ae932746761 100644 --- a/xds/src/main/java/io/grpc/xds/FilterRegistry.java +++ b/xds/src/main/java/io/grpc/xds/FilterRegistry.java @@ -37,7 +37,8 @@ static synchronized FilterRegistry getDefaultRegistry() { instance = newRegistry().register( FaultFilter.INSTANCE, RouterFilter.INSTANCE, - RbacFilter.INSTANCE); + RbacFilter.INSTANCE, + RlqsFilter.INSTANCE); } return instance; } diff --git a/xds/src/main/java/io/grpc/xds/RlqsFilter.java b/xds/src/main/java/io/grpc/xds/RlqsFilter.java new file mode 100644 index 00000000000..7e1712ff091 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/RlqsFilter.java @@ -0,0 +1,244 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.xds.client.XdsResourceType.ResourceInvalidException; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaBucketSettings; +import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig; +import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.xds.Filter.ServerInterceptorBuilder; +import io.grpc.xds.internal.datatype.GrpcService; +import io.grpc.xds.internal.matchers.Matcher; +import io.grpc.xds.internal.matchers.MatcherList; +import io.grpc.xds.internal.matchers.OnMatch; +import io.grpc.xds.internal.rlqs.RlqsBucketSettings; +import io.grpc.xds.internal.rlqs.RlqsClientPool; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; + +/** RBAC Http filter implementation. */ +final class RlqsFilter implements Filter, ServerInterceptorBuilder { + // private static final Logger logger = Logger.getLogger(RlqsFilter.class.getName()); + + static final RlqsFilter INSTANCE = new RlqsFilter(); + + static final String TYPE_URL = "type.googleapis.com/" + + "envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig"; + static final String TYPE_URL_OVERRIDE_CONFIG = "type.googleapis.com/" + + "envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride"; + + private final AtomicReference rlqsClientPoolRef = new AtomicReference<>(); + + // RlqsFilter() { + // rlqsClientPool = new RlqsClientPool() + // } + + @Override + public String[] typeUrls() { + return new String[]{TYPE_URL, TYPE_URL_OVERRIDE_CONFIG}; + } + + @Override + public ConfigOrError parseFilterConfig(Message rawProtoMessage) { + try { + RlqsFilterConfig rlqsFilterConfig = + parseRlqsFilter(unpackAny(rawProtoMessage, RateLimitQuotaFilterConfig.class)); + return ConfigOrError.fromConfig(rlqsFilterConfig); + } catch (InvalidProtocolBufferException e) { + return ConfigOrError.fromError("Can't unpack RateLimitQuotaFilterConfig proto: " + e); + } catch (ResourceInvalidException e) { + return ConfigOrError.fromError(e.getMessage()); + } + } + + @Override + public ConfigOrError parseFilterConfigOverride(Message rawProtoMessage) { + try { + RlqsFilterConfig rlqsFilterConfig = + parseRlqsFilterOverride(unpackAny(rawProtoMessage, RateLimitQuotaOverride.class)); + return ConfigOrError.fromConfig(rlqsFilterConfig); + } catch (InvalidProtocolBufferException e) { + return ConfigOrError.fromError("Can't unpack RateLimitQuotaOverride proto: " + e); + } catch (ResourceInvalidException e) { + return ConfigOrError.fromError(e.getMessage()); + } + } + + @Nullable + @Override + public ServerInterceptor buildServerInterceptor( + FilterConfig config, @Nullable FilterConfig overrideConfig) { + throw new UnsupportedOperationException("ScheduledExecutorService scheduler required"); + } + + @Override + public ServerInterceptor buildServerInterceptor( + FilterConfig config, + @Nullable FilterConfig overrideConfig, + ScheduledExecutorService scheduler) { + // Called when we get an xds update - when the LRS or RLS changes. + RlqsFilterConfig rlqsFilterConfig = (RlqsFilterConfig) checkNotNull(config, "config"); + + // Per-route and per-host configuration overrides. + if (overrideConfig != null) { + RlqsFilterConfig rlqsFilterOverride = (RlqsFilterConfig) overrideConfig; + // All fields are inherited from the main config, unless overridden. + RlqsFilterConfig.Builder overrideBuilder = rlqsFilterConfig.toBuilder(); + if (!rlqsFilterOverride.domain().isEmpty()) { + overrideBuilder.domain(rlqsFilterOverride.domain()); + } + if (rlqsFilterOverride.bucketMatchers() != null) { + overrideBuilder.bucketMatchers(rlqsFilterOverride.bucketMatchers()); + } + // Override bucket matchers if not null. + rlqsFilterConfig = overrideBuilder.build(); + } + + rlqsClientPoolRef.compareAndSet(null, RlqsClientPool.newInstance(scheduler)); + return generateRlqsInterceptor(rlqsFilterConfig); + } + + @Override + public void shutdown() { + // TODO(sergiitk): [DESIGN] besides shutting down everything, should there + // be per-route interceptor destructors? + RlqsClientPool oldClientPool = rlqsClientPoolRef.getAndUpdate(unused -> null); + if (oldClientPool != null) { + oldClientPool.shutdown(); + } + } + + @Nullable + private ServerInterceptor generateRlqsInterceptor(RlqsFilterConfig config) { + checkNotNull(config, "config"); + checkNotNull(config.rlqsService(), "config.rlqsService"); + RlqsClientPool rlqsClientPool = rlqsClientPoolRef.get(); + if (rlqsClientPool == null) { + // Being shut down, return no interceptor. + return null; + } + // TODO(sergiitk): [DESIGN] Rlqs client should take the channel as an argument? + // TODO(sergiitk): [DESIGN] the key should be hashed (domain + buckets) merged config? + rlqsClientPool.addClient(config.rlqsService()); + + // final GrpcAuthorizationEngine authEngine = new GrpcAuthorizationEngine(config); + return new ServerInterceptor() { + @Override + public Listener interceptCall( + ServerCall call, Metadata headers, ServerCallHandler next) { + // Notes: + // map domain() -> an incarnation of bucket matchers, f.e. new RlqsEngine(domain, matchers). + // shared resource holder, acquire every rpc + // Store RLQS Client or channel in the config as a reference - FilterConfig config ref + // when parse. + // - atomic maybe + // - allocate channel on demand / ref counting + // - and interface to notify service interceptor on shutdown + // - destroy channel when ref count 0 + // potentially many RLQS Clients sharing a channel to grpc RLQS service - + // TODO(sergiitk): [QUESTION] look up how cache is looked up + // now we create filters every RPC. will be change in RBAC. + // we need to avoid recreating filter when config doesn't change + // m: trigger close() after we create new instances + // RBAC filter recreate? - has to be fixed for RBAC + // AI: follow up with Eric on how cache is shared, this changes if we need to cache + // interceptor + // AI: discuss the lifetime of RLQS channel and the cache - needs wider per-lang discussion. + + // Example: + // AuthDecision authResult = authEngine.evaluate(headers, call); + // if (logger.isLoggable(Level.FINE)) { + // logger.log(Level.FINE, + // "Authorization result for serverCall {0}: {1}, matching policy: {2}.", + // new Object[]{call, authResult.decision(), authResult.matchingPolicyName()}); + // } + // if (GrpcAuthorizationEngine.Action.DENY.equals(authResult.decision())) { + // Status status = Status.PERMISSION_DENIED.withDescription("Access Denied"); + // call.close(status, new Metadata()); + // return new ServerCall.Listener(){}; + // } + return next.startCall(call, headers); + } + }; + } + + @VisibleForTesting + static RlqsFilterConfig parseRlqsFilter(RateLimitQuotaFilterConfig rlqsFilterProto) + throws ResourceInvalidException, InvalidProtocolBufferException { + RlqsFilterConfig.Builder builder = RlqsFilterConfig.builder(); + if (rlqsFilterProto.getDomain().isEmpty()) { + throw new ResourceInvalidException("RateLimitQuotaFilterConfig domain is required"); + } + builder.domain(rlqsFilterProto.getDomain()) + .rlqsService(GrpcService.fromEnvoyProto(rlqsFilterProto.getRlqsServer())); + + // TODO(sergiitk): [IMPL] actually parse, move to RlqsBucketSettings.fromProto() + RateLimitQuotaBucketSettings fallbackBucketSettingsProto = unpackAny( + rlqsFilterProto.getBucketMatchers().getOnNoMatch().getAction().getTypedConfig(), + RateLimitQuotaBucketSettings.class); + RlqsBucketSettings fallbackBucket = RlqsBucketSettings.create( + ImmutableMap.of("bucket_id", headers -> "hello"), + fallbackBucketSettingsProto.getReportingInterval()); + + // TODO(sergiitk): [IMPL] actually parse, move to Matcher.fromProto() + Matcher bucketMatchers = new Matcher() { + @Nullable + @Override + public MatcherList matcherList() { + return null; + } + + @Override + public OnMatch onNoMatch() { + return OnMatch.ofAction(fallbackBucket); + } + }; + + return builder.bucketMatchers(bucketMatchers).build(); + } + + @VisibleForTesting + static RlqsFilterConfig parseRlqsFilterOverride(RateLimitQuotaOverride rlqsFilterProtoOverride) + throws ResourceInvalidException { + RlqsFilterConfig.Builder builder = RlqsFilterConfig.builder(); + // TODO(sergiitk): [IMPL] bucket_matchers. + + return builder.domain(rlqsFilterProtoOverride.getDomain()).build(); + } + + private static T unpackAny( + Message message, Class clazz) throws InvalidProtocolBufferException { + if (!(message instanceof Any)) { + throw new InvalidProtocolBufferException( + "Invalid config type: " + message.getClass().getCanonicalName()); + } + return ((Any) message).unpack(clazz); + } +} diff --git a/xds/src/main/java/io/grpc/xds/RlqsFilterConfig.java b/xds/src/main/java/io/grpc/xds/RlqsFilterConfig.java new file mode 100644 index 00000000000..b9fe1c7e83e --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/RlqsFilterConfig.java @@ -0,0 +1,60 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import com.google.auto.value.AutoValue; +import io.grpc.xds.Filter.FilterConfig; +import io.grpc.xds.internal.datatype.GrpcService; +import io.grpc.xds.internal.matchers.Matcher; +import io.grpc.xds.internal.rlqs.RlqsBucketSettings; +import javax.annotation.Nullable; + +/** Parsed RateLimitQuotaFilterConfig. */ +@AutoValue +abstract class RlqsFilterConfig implements FilterConfig { + + @Override + public final String typeUrl() { + return RlqsFilter.TYPE_URL; + } + + abstract String domain(); + + @Nullable + abstract GrpcService rlqsService(); + + @Nullable + abstract Matcher bucketMatchers(); + + public static Builder builder() { + return new AutoValue_RlqsFilterConfig.Builder(); + } + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder domain(String domain); + + abstract Builder rlqsService(GrpcService rlqsService); + + public abstract Builder bucketMatchers(Matcher matcher); + + abstract RlqsFilterConfig build(); + } + +} diff --git a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java index bf8603fb3e4..0d20eec2000 100644 --- a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java +++ b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java @@ -455,6 +455,13 @@ private void shutdown() { private void updateSelector() { Map> filterChainRouting = new HashMap<>(); + // TODO(sergiitk): [QUESTION] is this a good place to reset interceptors? + // for (FilterChain filterChain : savedRdsRoutingConfigRef.keySet()) { + // if (!resourceName.equals(filterChain.httpConnectionManager().rdsName())) { + // continue; + // } + // + // } savedRdsRoutingConfigRef.clear(); for (FilterChain filterChain: filterChains) { filterChainRouting.put(filterChain, generateRoutingConfig(filterChain)); @@ -512,9 +519,11 @@ private ImmutableMap generatePerRouteInterceptors( FilterConfig filterConfig = namedFilterConfig.filterConfig; Filter filter = filterRegistry.get(filterConfig.typeUrl()); if (filter instanceof ServerInterceptorBuilder) { - ServerInterceptor interceptor = - ((ServerInterceptorBuilder) filter).buildServerInterceptor( - filterConfig, selectedOverrideConfigs.get(namedFilterConfig.name)); + ServerInterceptorBuilder interceptorBuilder = (ServerInterceptorBuilder) filter; + ServerInterceptor interceptor = interceptorBuilder.buildServerInterceptor( + filterConfig, + selectedOverrideConfigs.get(namedFilterConfig.name), + timeService); if (interceptor != null) { filterInterceptors.add(interceptor); } diff --git a/xds/src/main/java/io/grpc/xds/client/XdsResourceType.java b/xds/src/main/java/io/grpc/xds/client/XdsResourceType.java index 8c3d31604e4..0e810d696c1 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsResourceType.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsResourceType.java @@ -132,6 +132,14 @@ public ResourceInvalidException(String message) { public ResourceInvalidException(String message, Throwable cause) { super(cause != null ? message + ": " + cause.getMessage() : message, cause, false, false); } + + public static ResourceInvalidException ofResource(String resourceName, String reason) { + return new ResourceInvalidException("Error parsing " + resourceName + ": " + reason); + } + + public static ResourceInvalidException ofResource(Message proto, String reason) { + return ResourceInvalidException.ofResource(proto.getClass().getCanonicalName(), reason); + } } ValidatedResourceUpdate parse(Args args, List resources) { diff --git a/xds/src/main/java/io/grpc/xds/internal/datatype/GrpcService.java b/xds/src/main/java/io/grpc/xds/internal/datatype/GrpcService.java new file mode 100644 index 00000000000..658544a4291 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/datatype/GrpcService.java @@ -0,0 +1,77 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.datatype; + +import static io.grpc.xds.client.XdsResourceType.ResourceInvalidException; + +import com.google.auto.value.AutoValue; +import com.google.protobuf.Duration; +import javax.annotation.Nullable; + +@AutoValue +public abstract class GrpcService { + public abstract String targetUri(); + + // TODO(sergiitk): [QUESTION] do we need this? + // abstract String statPrefix(); + + // TODO(sergiitk): [IMPL] channelCredentials + // TODO(sergiitk): [IMPL] callCredentials + // TODO(sergiitk): [IMPL] channelArgs + + /** Optional timeout duration for the gRPC request to the service. */ + @Nullable + public abstract Duration timeout(); + + public static GrpcService fromEnvoyProto( + io.envoyproxy.envoy.config.core.v3.GrpcService grpcServiceProto) + throws ResourceInvalidException { + if (grpcServiceProto.getTargetSpecifierCase() + != io.envoyproxy.envoy.config.core.v3.GrpcService.TargetSpecifierCase.GOOGLE_GRPC) { + throw ResourceInvalidException.ofResource(grpcServiceProto, + "Only GoogleGrpc targets supported, got " + grpcServiceProto.getTargetSpecifierCase()); + } + Builder builder = GrpcService.builder(); + if (grpcServiceProto.hasTimeout()) { + builder.timeout(grpcServiceProto.getTimeout()); + } + // GoogleGrpc fields flattened. + io.envoyproxy.envoy.config.core.v3.GrpcService.GoogleGrpc googleGrpcProto = + grpcServiceProto.getGoogleGrpc(); + builder.targetUri(googleGrpcProto.getTargetUri()); + + // TODO(sergiitk): [IMPL] channelCredentials + // TODO(sergiitk): [IMPL] callCredentials + // TODO(sergiitk): [IMPL] channelArgs + // TODO(sergiitk): [IMPL] statPrefix - (maybe) + + return builder.build(); + } + + public static Builder builder() { + return new AutoValue_GrpcService.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder targetUri(String targetUri); + + public abstract Builder timeout(Duration timeout); + + public abstract GrpcService build(); + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/matchers/CelMatcher.java b/xds/src/main/java/io/grpc/xds/internal/matchers/CelMatcher.java new file mode 100644 index 00000000000..fac76106751 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/matchers/CelMatcher.java @@ -0,0 +1,67 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.matchers; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.ImmutableMap; +import dev.cel.common.CelAbstractSyntaxTree; +import dev.cel.runtime.CelEvaluationException; +import dev.cel.runtime.CelRuntime; +import dev.cel.runtime.CelRuntimeFactory; +import java.util.function.Predicate; + +/** Unified Matcher API: xds.type.matcher.v3.CelMatcher. */ +public class CelMatcher implements Predicate { + private static final CelRuntime CEL_RUNTIME = + CelRuntimeFactory.standardCelRuntimeBuilder().build(); + + private final CelRuntime.Program program; + private final String description; + + private CelMatcher(CelAbstractSyntaxTree ast, String description) throws CelEvaluationException { + this.program = CEL_RUNTIME.createProgram(checkNotNull(ast)); + this.description = description != null ? description : ""; + } + + public static CelMatcher create(CelAbstractSyntaxTree ast) throws CelEvaluationException { + return new CelMatcher(ast, null); + } + + public static CelMatcher create(CelAbstractSyntaxTree ast, String description) + throws CelEvaluationException { + return new CelMatcher(ast, description); + } + + public String description() { + return description; + } + + @Override + public boolean test(HttpMatchInput httpMatchInput) { + // if (httpMatchInput.headers().keys().isEmpty()) { + // return false; + // } + // TODO(sergiitk): [IMPL] convert headers to cel args + try { + return (boolean) program.eval(ImmutableMap.of("my_var", "Hello World")); + } catch (CelEvaluationException e) { + // TODO(sergiitk): [IMPL] log cel error? + return false; + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/matchers/HttpMatchInput.java b/xds/src/main/java/io/grpc/xds/internal/matchers/HttpMatchInput.java new file mode 100644 index 00000000000..08cdfacab4f --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/matchers/HttpMatchInput.java @@ -0,0 +1,28 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package io.grpc.xds.internal.matchers; + +import com.google.auto.value.AutoValue; +import io.grpc.Metadata; + +@AutoValue +public abstract class HttpMatchInput { + public abstract Metadata headers(); + // TODO(sergiitk): [IMPL] consider + // public abstract ServerCall serverCall(); +} diff --git a/xds/src/main/java/io/grpc/xds/internal/matchers/Matcher.java b/xds/src/main/java/io/grpc/xds/internal/matchers/Matcher.java new file mode 100644 index 00000000000..7d8cd2ea220 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/matchers/Matcher.java @@ -0,0 +1,31 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.matchers; + +import javax.annotation.Nullable; + +/** Unified Matcher API: xds.type.matcher.v3.Matcher. */ +public abstract class Matcher { + // TODO(sergiitk): [IMPL] iterator? + // TODO(sergiitk): [IMPL] public boolean matches(EvaluateArgs args) ? + + // TODO(sergiitk): [IMPL] AutoOneOf MatcherList, MatcherTree + @Nullable + public abstract MatcherList matcherList(); + + public abstract OnMatch onNoMatch(); +} diff --git a/xds/src/main/java/io/grpc/xds/internal/matchers/MatcherList.java b/xds/src/main/java/io/grpc/xds/internal/matchers/MatcherList.java new file mode 100644 index 00000000000..35062a8c161 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/matchers/MatcherList.java @@ -0,0 +1,22 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.matchers; + +/** Unified Matcher API: xds.type.matcher.v3.Matcher.MatcherList. */ +public class MatcherList { + +} diff --git a/xds/src/main/java/io/grpc/xds/internal/matchers/OnMatch.java b/xds/src/main/java/io/grpc/xds/internal/matchers/OnMatch.java new file mode 100644 index 00000000000..5f845337f81 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/matchers/OnMatch.java @@ -0,0 +1,40 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.matchers; + + +import com.google.auto.value.AutoOneOf; + +/** Unified Matcher API: xds.type.matcher.v3.Matcher.OnMatch. */ +@AutoOneOf(OnMatch.Kind.class) +public abstract class OnMatch { + public enum Kind { MATCHER, ACTION } + + public abstract Kind getKind(); + + public abstract Matcher matcher(); + + public abstract T action(); + + public static OnMatch ofMatcher(Matcher matcher) { + return AutoOneOf_OnMatch.matcher(matcher); + } + + public static OnMatch ofAction(T result) { + return AutoOneOf_OnMatch.action(result); + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsBucketSettings.java b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsBucketSettings.java new file mode 100644 index 00000000000..8fb759817bc --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsBucketSettings.java @@ -0,0 +1,37 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.rlqs; + +import com.google.auto.value.AutoValue; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Duration; +import io.grpc.xds.internal.matchers.HttpMatchInput; + +@AutoValue +public abstract class RlqsBucketSettings { + + public abstract ImmutableMap> bucketIdBuilder(); + + public abstract Duration reportingInterval(); + + public static RlqsBucketSettings create( + ImmutableMap> bucketIdBuilder, + Duration reportingInterval) { + return new AutoValue_RlqsBucketSettings(bucketIdBuilder, reportingInterval); + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsClient.java b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsClient.java new file mode 100644 index 00000000000..f8f639fb03d --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsClient.java @@ -0,0 +1,36 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.rlqs; + +import java.util.logging.Level; +import java.util.logging.Logger; + +final class RlqsClient { + private static final Logger logger = Logger.getLogger(RlqsClient.class.getName()); + + private final String targetUri; + + RlqsClient(String targetUri) { + this.targetUri = targetUri; + } + + + public void shutdown() { + logger.log(Level.FINER, "Shutting down RlqsClient to {0}", targetUri); + // TODO(sergiitk): [IMPL] RlqsClient shutdown + } +} diff --git a/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsClientPool.java b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsClientPool.java new file mode 100644 index 00000000000..5ac01329bfe --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsClientPool.java @@ -0,0 +1,110 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.rlqs; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.Sets; +import io.grpc.SynchronizationContext; +import io.grpc.xds.internal.datatype.GrpcService; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +public final class RlqsClientPool { + private static final Logger logger = Logger.getLogger(RlqsClientPool.class.getName()); + + private static final int DEFAULT_CLEANUP_INTERVAL_SECONDS = 10; + + // TODO(sergiitk): [QUESTION] always in sync context? + private boolean shutdown; + private final SynchronizationContext syncContext = new SynchronizationContext((thread, error) -> { + String message = "Uncaught exception in RlqsClientPool SynchronizationContext. Panic!"; + logger.log(Level.FINE, message, error); + throw new RlqsPoolSynchronizationException(message, error); + }); + + private final ConcurrentHashMap clientPool = new ConcurrentHashMap<>(); + Set clientsToShutdown = Sets.newConcurrentHashSet(); + private final ScheduledExecutorService timeService; + private final int cleanupIntervalSeconds; + + + private RlqsClientPool(ScheduledExecutorService scheduler, int cleanupIntervalSeconds) { + this.timeService = checkNotNull(scheduler, "scheduler"); + checkArgument(cleanupIntervalSeconds >= 0, "cleanupIntervalSeconds < 0"); + this.cleanupIntervalSeconds = + cleanupIntervalSeconds > 0 ? cleanupIntervalSeconds : DEFAULT_CLEANUP_INTERVAL_SECONDS; + } + + /** Creates an instance. */ + public static RlqsClientPool newInstance(ScheduledExecutorService scheduler) { + // TODO(sergiitk): [IMPL] scheduler - consider using GrpcUtil.TIMER_SERVICE. + // TODO(sergiitk): [IMPL] note that the scheduler has a finite lifetime. + return new RlqsClientPool(scheduler, 0); + } + + public void run() { + Runnable cleanupTask = () -> { + if (shutdown) { + return; + } + for (String targetUri : clientsToShutdown) { + clientPool.get(targetUri).shutdown(); + clientPool.remove(targetUri); + } + clientsToShutdown.clear(); + }; + syncContext.schedule(cleanupTask, cleanupIntervalSeconds, TimeUnit.SECONDS, timeService); + } + + public void shutdown() { + syncContext.execute(() -> { + shutdown = true; + logger.log(Level.FINER, "Shutting down RlqsClientPool"); + clientsToShutdown.clear(); + for (String targetUri : clientPool.keySet()) { + clientPool.get(targetUri).shutdown(); + } + clientPool.clear(); + }); + } + + public void addClient(GrpcService rlqsService) { + syncContext.execute(() -> { + RlqsClient rlqsClient = new RlqsClient(rlqsService.targetUri()); + clientPool.put(rlqsService.targetUri(), rlqsClient); + }); + } + + /** + * Throws when fail to bootstrap or initialize the XdsClient. + */ + public static final class RlqsPoolSynchronizationException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public RlqsPoolSynchronizationException(String message, Throwable cause) { + super(message, cause); + } + } + + +} diff --git a/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java b/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java index 55b8812cd17..c2c9aeca488 100644 --- a/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java @@ -79,6 +79,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; @@ -108,19 +109,21 @@ public class XdsServerWrapperTest { private XdsServingStatusListener listener; private FilterChainSelectorManager selectorManager = new FilterChainSelectorManager(); - private FakeClock executor = new FakeClock(); + private final FakeClock executor = new FakeClock(); + private final ScheduledExecutorService timeService = executor.getScheduledExecutorService(); private FakeXdsClient xdsClient = new FakeXdsClient(); private FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry(); private XdsServerWrapper xdsServerWrapper; private ServerRoutingConfig noopConfig = ServerRoutingConfig.create( ImmutableList.of(), ImmutableMap.of()); + @Before public void setup() { when(mockBuilder.build()).thenReturn(mockServer); xdsServerWrapper = new XdsServerWrapper("0.0.0.0:1", mockBuilder, listener, selectorManager, new FakeXdsClientPoolFactory(xdsClient), - filterRegistry, executor.getScheduledExecutorService()); + filterRegistry, timeService); } @After @@ -1136,9 +1139,10 @@ public ServerCall.Listener interceptCall(ServerCallof()); @@ -1208,10 +1212,13 @@ public ServerCall.Listener interceptCall(ServerCall