Skip to content

Commit

Permalink
Rate limiting should be ineffective when RateLimitInfo is not present
Browse files Browse the repository at this point in the history
  • Loading branch information
kongweihan committed May 22, 2024
1 parent 24d8082 commit 4f0fa05
Showing 1 changed file with 56 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -64,16 +65,17 @@ class RateLimitingServerStreamingCallable
// as the server side cap
private static final double MAX_FACTOR = 1.3;

private final RateLimiter limiter;
private final ConditionalRateLimiter limiter;

private final AtomicReference<Instant> lastQpsChangeTime = new AtomicReference<>(Instant.now());
private final ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable;

RateLimitingServerStreamingCallable(
@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable) {
this.limiter = RateLimiter.create(DEFAULT_QPS);
this.limiter = new ConditionalRateLimiter(DEFAULT_QPS);
this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set");
logger.info("Rate limiting is enabled with initial QPS of " + limiter.getRate());
logger.info("BatchWriteFlowControl: Rate limiting callable is initiated with QPS of "
+ limiter.getRate());
}

@Override
Expand All @@ -93,14 +95,57 @@ public void call(
innerCallable.call(request, innerObserver, context);
}

class ConditionalRateLimiter {

private AtomicBoolean enabled = new AtomicBoolean(true);

private final RateLimiter limiter;

public ConditionalRateLimiter(long defaultQps) {
limiter = RateLimiter.create(defaultQps);
}

public double acquire() {
if (enabled.get()) {
return limiter.acquire();
} else {
return 0;
}
}

public void setEnabled(boolean enabled) {
boolean wasEnabled = this.enabled.getAndSet(enabled);
if (wasEnabled && !enabled) {
logger.fine("BatchWriteFlowControl: rate limiter disabled.");
}
if (!wasEnabled && enabled) {
logger.fine("BatchWriteFlowControl: rate limiter enabled.");
}
}

public boolean isEnabled() {
return this.enabled.get();
}

public double getRate() {
return limiter.getRate();
}

public void setRate(double rate) {
logger.fine(
"BatchWriteFlowControl: updated rate from " + limiter.getRate() + " to " + rate);
limiter.setRate(rate);
}
}

class RateLimitingResponseObserver extends SafeResponseObserver<MutateRowsResponse> {
private final ResponseObserver<MutateRowsResponse> outerObserver;
private final RateLimiter rateLimiter;
private final ConditionalRateLimiter rateLimiter;

private final AtomicReference<Instant> lastQpsChangeTime;

RateLimitingResponseObserver(
RateLimiter rateLimiter,
ConditionalRateLimiter rateLimiter,
AtomicReference<Instant> lastQpsChangeTime,
ResponseObserver<MutateRowsResponse> observer) {
super(observer);
Expand All @@ -116,7 +161,10 @@ protected void onStartImpl(StreamController controller) {

@Override
protected void onResponseImpl(MutateRowsResponse response) {
logger.fine(
"BatchWriteFlowControl: response contains RateLimitInfo=" + response.hasRateLimitInfo());
if (response.hasRateLimitInfo()) {
limiter.setEnabled(true);
RateLimitInfo info = response.getRateLimitInfo();
// RateLimitInfo is an optional field. However, proto3 sub-message field always
// have presence even thought it's marked as "optional". Check the factor and
Expand All @@ -126,6 +174,8 @@ protected void onResponseImpl(MutateRowsResponse response) {
info.getFactor(),
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())));
}
} else {
limiter.setEnabled(false);
}
outerObserver.onResponse(response);
}
Expand Down Expand Up @@ -158,7 +208,7 @@ private void updateQps(double factor, Duration period) {
logger.log(
Level.FINE,
"Updated QPS from {0} to {1}, server returned factor is {2}, capped factor is {3}",
new Object[] {currentRate, limiter.getRate(), factor, cappedFactor});
new Object[]{currentRate, limiter.getRate(), factor, cappedFactor});
}
}
}
Expand Down

0 comments on commit 4f0fa05

Please sign in to comment.