Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RLQS implementation prototype #25

Closed
wants to merge 12 commits into from
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ conscrypt = "org.conscrypt:conscrypt-openjdk-uber:2.5.2"
# Cronet (API and Embedded) upgrade is blocked by https://github.com/grpc/grpc-java/issues/10396.
cronet-api = "org.chromium.net:cronet-api:108.5359.79"
cronet-embedded = "org.chromium.net:cronet-embedded:108.5359.79"
dev-cel = "dev.cel:cel:0.4.1"
errorprone-annotations = "com.google.errorprone:error_prone_annotations:2.23.0"
errorprone-core = "com.google.errorprone:error_prone_core:2.23.0"
google-api-protos = "com.google.api.grpc:proto-google-common-protos:2.29.0"
Expand Down
1 change: 1 addition & 0 deletions xds/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ dependencies {
project(':grpc-services'),
project(':grpc-auth'),
project(path: ':grpc-alts', configuration: 'shadow'),
libraries.dev.cel,
libraries.gson,
libraries.re2j,
libraries.auto.value.annotations,
Expand Down
15 changes: 15 additions & 0 deletions xds/src/main/java/io/grpc/xds/Filter.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ interface Filter {
*/
ConfigOrError<? extends FilterConfig> parseFilterConfigOverride(Message rawProtoMessage);

default void shutdown() {
// Implement as needed.
// TODO(sergiitk): [DESIGN] important to cover and discuss in the design.
// TODO(sergiitk): [QUESTION] should it be in ServerInterceptorBuilder?
}

/** Represents an opaque data structure holding configuration for a filter. */
interface FilterConfig {
String typeUrl();
Expand All @@ -68,6 +74,15 @@ interface ServerInterceptorBuilder {
@Nullable
ServerInterceptor buildServerInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig);

@Nullable
default ServerInterceptor buildServerInterceptor(
FilterConfig config,
@Nullable FilterConfig overrideConfig,
ScheduledExecutorService scheduler) {
return buildServerInterceptor(config, overrideConfig);
}

}

/** Filter config with instance name. */
Expand Down
3 changes: 2 additions & 1 deletion xds/src/main/java/io/grpc/xds/FilterRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ static synchronized FilterRegistry getDefaultRegistry() {
instance = newRegistry().register(
FaultFilter.INSTANCE,
RouterFilter.INSTANCE,
RbacFilter.INSTANCE);
RbacFilter.INSTANCE,
RlqsFilter.INSTANCE);
}
return instance;
}
Expand Down
244 changes: 244 additions & 0 deletions xds/src/main/java/io/grpc/xds/RlqsFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.xds;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.client.XdsResourceType.ResourceInvalidException;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaBucketSettings;
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig;
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.xds.Filter.ServerInterceptorBuilder;
import io.grpc.xds.internal.datatype.GrpcService;
import io.grpc.xds.internal.matchers.Matcher;
import io.grpc.xds.internal.matchers.MatcherList;
import io.grpc.xds.internal.matchers.OnMatch;
import io.grpc.xds.internal.rlqs.RlqsBucketSettings;
import io.grpc.xds.internal.rlqs.RlqsClientPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

/** RBAC Http filter implementation. */
final class RlqsFilter implements Filter, ServerInterceptorBuilder {
// private static final Logger logger = Logger.getLogger(RlqsFilter.class.getName());

static final RlqsFilter INSTANCE = new RlqsFilter();

static final String TYPE_URL = "type.googleapis.com/"
+ "envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig";
static final String TYPE_URL_OVERRIDE_CONFIG = "type.googleapis.com/"
+ "envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride";

private final AtomicReference<RlqsClientPool> rlqsClientPoolRef = new AtomicReference<>();

// RlqsFilter() {
// rlqsClientPool = new RlqsClientPool()
// }

@Override
public String[] typeUrls() {
return new String[]{TYPE_URL, TYPE_URL_OVERRIDE_CONFIG};
}

@Override
public ConfigOrError<RlqsFilterConfig> parseFilterConfig(Message rawProtoMessage) {
try {
RlqsFilterConfig rlqsFilterConfig =
parseRlqsFilter(unpackAny(rawProtoMessage, RateLimitQuotaFilterConfig.class));
return ConfigOrError.fromConfig(rlqsFilterConfig);
} catch (InvalidProtocolBufferException e) {
return ConfigOrError.fromError("Can't unpack RateLimitQuotaFilterConfig proto: " + e);
} catch (ResourceInvalidException e) {
return ConfigOrError.fromError(e.getMessage());
}
}

@Override
public ConfigOrError<RlqsFilterConfig> parseFilterConfigOverride(Message rawProtoMessage) {
try {
RlqsFilterConfig rlqsFilterConfig =
parseRlqsFilterOverride(unpackAny(rawProtoMessage, RateLimitQuotaOverride.class));
return ConfigOrError.fromConfig(rlqsFilterConfig);
} catch (InvalidProtocolBufferException e) {
return ConfigOrError.fromError("Can't unpack RateLimitQuotaOverride proto: " + e);
} catch (ResourceInvalidException e) {
return ConfigOrError.fromError(e.getMessage());
}
}

@Nullable
@Override
public ServerInterceptor buildServerInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig) {
throw new UnsupportedOperationException("ScheduledExecutorService scheduler required");
}

@Override
public ServerInterceptor buildServerInterceptor(
FilterConfig config,
@Nullable FilterConfig overrideConfig,
ScheduledExecutorService scheduler) {
// Called when we get an xds update - when the LRS or RLS changes.
RlqsFilterConfig rlqsFilterConfig = (RlqsFilterConfig) checkNotNull(config, "config");

// Per-route and per-host configuration overrides.
if (overrideConfig != null) {
RlqsFilterConfig rlqsFilterOverride = (RlqsFilterConfig) overrideConfig;
// All fields are inherited from the main config, unless overridden.
RlqsFilterConfig.Builder overrideBuilder = rlqsFilterConfig.toBuilder();
if (!rlqsFilterOverride.domain().isEmpty()) {
overrideBuilder.domain(rlqsFilterOverride.domain());
}
if (rlqsFilterOverride.bucketMatchers() != null) {
overrideBuilder.bucketMatchers(rlqsFilterOverride.bucketMatchers());
}
// Override bucket matchers if not null.
rlqsFilterConfig = overrideBuilder.build();
}

rlqsClientPoolRef.compareAndSet(null, RlqsClientPool.newInstance(scheduler));
return generateRlqsInterceptor(rlqsFilterConfig);
}

@Override
public void shutdown() {
// TODO(sergiitk): [DESIGN] besides shutting down everything, should there
// be per-route interceptor destructors?
RlqsClientPool oldClientPool = rlqsClientPoolRef.getAndUpdate(unused -> null);
if (oldClientPool != null) {
oldClientPool.shutdown();
}
}

@Nullable
private ServerInterceptor generateRlqsInterceptor(RlqsFilterConfig config) {
checkNotNull(config, "config");
checkNotNull(config.rlqsService(), "config.rlqsService");
RlqsClientPool rlqsClientPool = rlqsClientPoolRef.get();
if (rlqsClientPool == null) {
// Being shut down, return no interceptor.
return null;
}
// TODO(sergiitk): [DESIGN] Rlqs client should take the channel as an argument?
// TODO(sergiitk): [DESIGN] the key should be hashed (domain + buckets) merged config?
rlqsClientPool.addClient(config.rlqsService());

// final GrpcAuthorizationEngine authEngine = new GrpcAuthorizationEngine(config);
return new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
// Notes:
// map domain() -> an incarnation of bucket matchers, f.e. new RlqsEngine(domain, matchers).
// shared resource holder, acquire every rpc
// Store RLQS Client or channel in the config as a reference - FilterConfig config ref
// when parse.
// - atomic maybe
// - allocate channel on demand / ref counting
// - and interface to notify service interceptor on shutdown
// - destroy channel when ref count 0
// potentially many RLQS Clients sharing a channel to grpc RLQS service -
// TODO(sergiitk): [QUESTION] look up how cache is looked up
// now we create filters every RPC. will be change in RBAC.
// we need to avoid recreating filter when config doesn't change
// m: trigger close() after we create new instances
// RBAC filter recreate? - has to be fixed for RBAC
// AI: follow up with Eric on how cache is shared, this changes if we need to cache
// interceptor
// AI: discuss the lifetime of RLQS channel and the cache - needs wider per-lang discussion.

// Example:
// AuthDecision authResult = authEngine.evaluate(headers, call);
// if (logger.isLoggable(Level.FINE)) {
// logger.log(Level.FINE,
// "Authorization result for serverCall {0}: {1}, matching policy: {2}.",
// new Object[]{call, authResult.decision(), authResult.matchingPolicyName()});
// }
// if (GrpcAuthorizationEngine.Action.DENY.equals(authResult.decision())) {
// Status status = Status.PERMISSION_DENIED.withDescription("Access Denied");
// call.close(status, new Metadata());
// return new ServerCall.Listener<ReqT>(){};
// }
return next.startCall(call, headers);
}
};
}

@VisibleForTesting
static RlqsFilterConfig parseRlqsFilter(RateLimitQuotaFilterConfig rlqsFilterProto)
throws ResourceInvalidException, InvalidProtocolBufferException {
RlqsFilterConfig.Builder builder = RlqsFilterConfig.builder();
if (rlqsFilterProto.getDomain().isEmpty()) {
throw new ResourceInvalidException("RateLimitQuotaFilterConfig domain is required");
}
builder.domain(rlqsFilterProto.getDomain())
.rlqsService(GrpcService.fromEnvoyProto(rlqsFilterProto.getRlqsServer()));

// TODO(sergiitk): [IMPL] actually parse, move to RlqsBucketSettings.fromProto()
RateLimitQuotaBucketSettings fallbackBucketSettingsProto = unpackAny(
rlqsFilterProto.getBucketMatchers().getOnNoMatch().getAction().getTypedConfig(),
RateLimitQuotaBucketSettings.class);
RlqsBucketSettings fallbackBucket = RlqsBucketSettings.create(
ImmutableMap.of("bucket_id", headers -> "hello"),
fallbackBucketSettingsProto.getReportingInterval());

// TODO(sergiitk): [IMPL] actually parse, move to Matcher.fromProto()
Matcher<RlqsBucketSettings> bucketMatchers = new Matcher<RlqsBucketSettings>() {
@Nullable
@Override
public MatcherList<RlqsBucketSettings> matcherList() {
return null;
}

@Override
public OnMatch<RlqsBucketSettings> onNoMatch() {
return OnMatch.ofAction(fallbackBucket);
}
};

return builder.bucketMatchers(bucketMatchers).build();
}

@VisibleForTesting
static RlqsFilterConfig parseRlqsFilterOverride(RateLimitQuotaOverride rlqsFilterProtoOverride)
throws ResourceInvalidException {
RlqsFilterConfig.Builder builder = RlqsFilterConfig.builder();
// TODO(sergiitk): [IMPL] bucket_matchers.

return builder.domain(rlqsFilterProtoOverride.getDomain()).build();
}

private static <T extends com.google.protobuf.Message> T unpackAny(
Message message, Class<T> clazz) throws InvalidProtocolBufferException {
if (!(message instanceof Any)) {
throw new InvalidProtocolBufferException(
"Invalid config type: " + message.getClass().getCanonicalName());
}
return ((Any) message).unpack(clazz);
}
}
60 changes: 60 additions & 0 deletions xds/src/main/java/io/grpc/xds/RlqsFilterConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.xds;

import com.google.auto.value.AutoValue;
import io.grpc.xds.Filter.FilterConfig;
import io.grpc.xds.internal.datatype.GrpcService;
import io.grpc.xds.internal.matchers.Matcher;
import io.grpc.xds.internal.rlqs.RlqsBucketSettings;
import javax.annotation.Nullable;

/** Parsed RateLimitQuotaFilterConfig. */
@AutoValue
abstract class RlqsFilterConfig implements FilterConfig {

@Override
public final String typeUrl() {
return RlqsFilter.TYPE_URL;
}

abstract String domain();

@Nullable
abstract GrpcService rlqsService();

@Nullable
abstract Matcher<RlqsBucketSettings> bucketMatchers();

public static Builder builder() {
return new AutoValue_RlqsFilterConfig.Builder();
}

abstract Builder toBuilder();

@AutoValue.Builder
abstract static class Builder {
abstract Builder domain(String domain);

abstract Builder rlqsService(GrpcService rlqsService);

public abstract Builder bucketMatchers(Matcher<RlqsBucketSettings> matcher);

abstract RlqsFilterConfig build();
}

}
15 changes: 12 additions & 3 deletions xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,13 @@ private void shutdown() {

private void updateSelector() {
Map<FilterChain, AtomicReference<ServerRoutingConfig>> filterChainRouting = new HashMap<>();
// TODO(sergiitk): [QUESTION] is this a good place to reset interceptors?
// for (FilterChain filterChain : savedRdsRoutingConfigRef.keySet()) {
// if (!resourceName.equals(filterChain.httpConnectionManager().rdsName())) {
// continue;
// }
//
// }
savedRdsRoutingConfigRef.clear();
for (FilterChain filterChain: filterChains) {
filterChainRouting.put(filterChain, generateRoutingConfig(filterChain));
Expand Down Expand Up @@ -512,9 +519,11 @@ private ImmutableMap<Route, ServerInterceptor> generatePerRouteInterceptors(
FilterConfig filterConfig = namedFilterConfig.filterConfig;
Filter filter = filterRegistry.get(filterConfig.typeUrl());
if (filter instanceof ServerInterceptorBuilder) {
ServerInterceptor interceptor =
((ServerInterceptorBuilder) filter).buildServerInterceptor(
filterConfig, selectedOverrideConfigs.get(namedFilterConfig.name));
ServerInterceptorBuilder interceptorBuilder = (ServerInterceptorBuilder) filter;
ServerInterceptor interceptor = interceptorBuilder.buildServerInterceptor(
filterConfig,
selectedOverrideConfigs.get(namedFilterConfig.name),
timeService);
if (interceptor != null) {
filterInterceptors.add(interceptor);
}
Expand Down
Loading