Skip to content

Commit

Permalink
Rate limiting should be ineffective when RateLimitInfo is not present
Browse files Browse the repository at this point in the history
Built a new ConditionalRateLimiter that can be disabled but keep the
rate.

Also moved setRate() into the ConditionalRateLimiter. Update and disable
is throttled based on period from RateLimitInfo.
  • Loading branch information
kongweihan committed May 29, 2024
1 parent 24d8082 commit b919180
Show file tree
Hide file tree
Showing 2 changed files with 355 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;

class RateLimitingServerStreamingCallable
extends ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> {

private static final Logger logger =
Logger.getLogger(RateLimitingServerStreamingCallable.class.getName());

Expand All @@ -64,16 +65,16 @@ class RateLimitingServerStreamingCallable
// as the server side cap
private static final double MAX_FACTOR = 1.3;

private final RateLimiter limiter;
private final ConditionalRateLimiter limiter;

private final AtomicReference<Instant> lastQpsChangeTime = new AtomicReference<>(Instant.now());
private final ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable;

RateLimitingServerStreamingCallable(
@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable) {
this.limiter = RateLimiter.create(DEFAULT_QPS);
this.limiter = new ConditionalRateLimiter(DEFAULT_QPS);
this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set");
logger.info("Rate limiting is enabled with initial QPS of " + limiter.getRate());
logger.info(
"Rate limiting is initiated (but disabled) with rate of " + limiter.getRate() + " QPS.");
}

@Override
Expand All @@ -88,44 +89,159 @@ public void call(
((BigtableTracer) context.getTracer())
.batchRequestThrottled(stopwatch.elapsed(TimeUnit.NANOSECONDS));
}
RateLimitingResponseObserver innerObserver =
new RateLimitingResponseObserver(limiter, lastQpsChangeTime, responseObserver);
RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(responseObserver);
innerCallable.call(request, innerObserver, context);
}

/** A rate limiter wrapper class that can be disabled. */
static class ConditionalRateLimiter {

private final AtomicBoolean enabled = new AtomicBoolean(false);

private final RateLimiter limiter;

// This is the next time allowed to change QPS or disable rate limiting.
private final AtomicReference<Instant> nextRateUpdateTime =
new AtomicReference<>(Instant.now());

public ConditionalRateLimiter(long defaultQps) {
limiter = RateLimiter.create(defaultQps);
}

/**
* Works the same way with {@link RateLimiter#acquire()} except that when the rate limiter is
* disabled, {@link ConditionalRateLimiter#acquire()} always returns 0.0 immediately.
*/
public double acquire() {
if (enabled.get()) {
return limiter.acquire();
} else {
return 0;
}
}

// Enable rate limiting immediately or disable after the QPS update period. Otherwise, no-op.

/**
* Disables the rate limier if the current time exceeded the next rate update time. Or, enables
* the rate limiter immediately, setting next rate update time to now. When disabled, the rate
* is retained and will be re-used if re-enabled later.
*
* @param enabled Enable or disable the rate limiter.
*/
public void trySetEnabled(boolean enabled) {
if (enabled) {
// Always enable immediately.
boolean wasEnabled = this.enabled.getAndSet(true);
if (!wasEnabled) {
logger.fine("Rate limiter is enabled.");
}
return;
}
// Only disable after the QPS update period.
Instant nextTime = nextRateUpdateTime.get();
Instant now = Instant.now();
if (now.isAfter(nextTime)) {
boolean wasEnabled = this.enabled.getAndSet(false);
if (wasEnabled) {
logger.fine("Rate limiter is disabled.");
}
}
}

public boolean isEnabled() {
return this.enabled.get();
}

public double getRate() {
return limiter.getRate();
}

// Update the rate after the QPS update period. Otherwise, no-op.

/**
* Sets the rate and the next rate update time based on period, if the current time exceeds the
* next rate update time. Otherwise, no-op.
*
* @param rate The new rate of the rate limiter.
* @param period The period during which rate should not be updated again and the rate limiter
* should not be disabled.
*/
public void trySetRate(double rate, Duration period) {
Instant nextTime = nextRateUpdateTime.get();
Instant now = Instant.now();

if (now.isBefore(nextTime)) {
return;
}

Instant newNextTime = now.plusSeconds(period.getSeconds());

if (!nextRateUpdateTime.compareAndSet(nextTime, newNextTime)) {
// Someone else updated it already.
return;
}
final double oldRate = limiter.getRate();
limiter.setRate(rate);
logger.fine(
"Updated rate from "
+ oldRate
+ " to "
+ rate
+ " with period "
+ period.getSeconds()
+ " seconds.");
}

@VisibleForTesting
void setEnabled(boolean enabled) {
this.enabled.set(enabled);
}
}

class RateLimitingResponseObserver extends SafeResponseObserver<MutateRowsResponse> {
private final ResponseObserver<MutateRowsResponse> outerObserver;
private final RateLimiter rateLimiter;

private final AtomicReference<Instant> lastQpsChangeTime;
private final ResponseObserver<MutateRowsResponse> outerObserver;

RateLimitingResponseObserver(
RateLimiter rateLimiter,
AtomicReference<Instant> lastQpsChangeTime,
ResponseObserver<MutateRowsResponse> observer) {
RateLimitingResponseObserver(ResponseObserver<MutateRowsResponse> observer) {
super(observer);
this.outerObserver = observer;
this.rateLimiter = rateLimiter;
this.lastQpsChangeTime = lastQpsChangeTime;
}

@Override
protected void onStartImpl(StreamController controller) {
outerObserver.onStart(controller);
}

private boolean hasValidRateLimitInfo(MutateRowsResponse response) {
// RateLimitInfo is an optional field. However, proto3 sub-message field always
// have presence even thought it's marked as "optional". Check the factor and
// period to make sure they're not 0.
if (!response.hasRateLimitInfo()) {
logger.finest("Response carries no RateLimitInfo");
return false;
}

if (response.getRateLimitInfo().getFactor() <= 0
|| response.getRateLimitInfo().getPeriod().getSeconds() <= 0) {
logger.finest("Response carries invalid RateLimitInfo=" + response.getRateLimitInfo());
return false;
}

logger.finest("Response carries valid RateLimitInfo=" + response.getRateLimitInfo());
return true;
}

@Override
protected void onResponseImpl(MutateRowsResponse response) {
if (response.hasRateLimitInfo()) {
if (hasValidRateLimitInfo(response)) {
limiter.trySetEnabled(true);
RateLimitInfo info = response.getRateLimitInfo();
// RateLimitInfo is an optional field. However, proto3 sub-message field always
// have presence even thought it's marked as "optional". Check the factor and
// period to make sure they're not 0.
if (info.getFactor() != 0 && info.getPeriod().getSeconds() != 0) {
updateQps(
info.getFactor(),
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())));
}
updateQps(
info.getFactor(),
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())));
} else {
limiter.trySetEnabled(false);
}
outerObserver.onResponse(response);
}
Expand All @@ -148,28 +264,30 @@ protected void onCompleteImpl() {
}

private void updateQps(double factor, Duration period) {
Instant lastTime = lastQpsChangeTime.get();
Instant now = Instant.now();

if (now.minus(period).isAfter(lastTime) && lastQpsChangeTime.compareAndSet(lastTime, now)) {
double cappedFactor = Math.min(Math.max(factor, MIN_FACTOR), MAX_FACTOR);
double currentRate = limiter.getRate();
limiter.setRate(Math.min(Math.max(currentRate * cappedFactor, MIN_QPS), MAX_QPS));
logger.log(
Level.FINE,
"Updated QPS from {0} to {1}, server returned factor is {2}, capped factor is {3}",
new Object[] {currentRate, limiter.getRate(), factor, cappedFactor});
}
double cappedFactor = Math.min(Math.max(factor, MIN_FACTOR), MAX_FACTOR);
double currentRate = limiter.getRate();
double cappedRate = Math.min(Math.max(currentRate * cappedFactor, MIN_QPS), MAX_QPS);
limiter.trySetRate(cappedRate, period);
}
}

@VisibleForTesting
AtomicReference<Instant> getLastQpsChangeTime() {
return lastQpsChangeTime;
AtomicReference<Instant> getNextRateUpdateTime() {
return limiter.nextRateUpdateTime;
}

@VisibleForTesting
double getCurrentRate() {
return limiter.getRate();
}

@VisibleForTesting
boolean getLimiterEnabled() {
return limiter.isEnabled();
}

@VisibleForTesting
void setLimiterEnabled(boolean enabled) {
limiter.setEnabled(enabled);
}
}
Loading

0 comments on commit b919180

Please sign in to comment.