Skip to content

Commit

Permalink
Rate limiting should be ineffective when RateLimitInfo is not present
Browse files Browse the repository at this point in the history
Built a new ConditionalRateLimiter that can be disabled but keep the
rate.

Also moved setRate() into the ConditionalRateLimiter. Update and disable
is throttled based on period from RateLimitInfo.

By default the rate limiter is disabled when initiated.

Enable, disable and update rate will be logged as INFO, as they're
critical to the run of the worker and only logged per 10s by default for
each worker thread.
  • Loading branch information
kongweihan committed May 30, 2024
1 parent 24d8082 commit da5fb62
Show file tree
Hide file tree
Showing 2 changed files with 342 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;

class RateLimitingServerStreamingCallable
extends ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> {

private static final Logger logger =
Logger.getLogger(RateLimitingServerStreamingCallable.class.getName());

Expand All @@ -64,16 +65,14 @@ class RateLimitingServerStreamingCallable
// as the server side cap
private static final double MAX_FACTOR = 1.3;

private final RateLimiter limiter;
private final ConditionalRateLimiter limiter;

private final AtomicReference<Instant> lastQpsChangeTime = new AtomicReference<>(Instant.now());
private final ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable;

RateLimitingServerStreamingCallable(
@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable) {
this.limiter = RateLimiter.create(DEFAULT_QPS);
this.limiter = new ConditionalRateLimiter(DEFAULT_QPS);
this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set");
logger.info("Rate limiting is enabled with initial QPS of " + limiter.getRate());
}

@Override
Expand All @@ -88,44 +87,162 @@ public void call(
((BigtableTracer) context.getTracer())
.batchRequestThrottled(stopwatch.elapsed(TimeUnit.NANOSECONDS));
}
RateLimitingResponseObserver innerObserver =
new RateLimitingResponseObserver(limiter, lastQpsChangeTime, responseObserver);
RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(responseObserver);
innerCallable.call(request, innerObserver, context);
}

/** A rate limiter wrapper class that can be disabled. */
static class ConditionalRateLimiter {

private final AtomicBoolean enabled = new AtomicBoolean(false);

private final RateLimiter limiter;

// This is the next time allowed to change QPS or disable rate limiting.
private final AtomicReference<Instant> nextRateUpdateTime =
new AtomicReference<>(Instant.now());

public ConditionalRateLimiter(long defaultQps) {
limiter = RateLimiter.create(defaultQps);
logger.info("Rate limiting is initiated (but disabled) with rate of " + defaultQps + " QPS.");
}

/**
* Works the same way with {@link RateLimiter#acquire()} except that when the rate limiter is
* disabled, {@link ConditionalRateLimiter#acquire()} always returns immediately.
*/
public void acquire() {
if (enabled.get()) {
limiter.acquire();
}
}

// Enable rate limiting immediately or disable after the QPS update period. Otherwise, no-op.

/**
* Disables the rate limier if the current time exceeded the next rate update time. When
* disabled, the rate is retained and will be re-used if re-enabled later.
*/
public void tryDisable() {
// Only disable after the rate update time.
Instant nextTime = nextRateUpdateTime.get();
Instant now = Instant.now();
if (now.isAfter(nextTime)) {
boolean wasEnabled = this.enabled.getAndSet(false);
if (wasEnabled) {
logger.info("Rate limiter is disabled.");
}
// No need to update nextRateUpdateTime, any new RateLimitInfo can enable rate limiting and
// update the rate again.
}
}

/** Enables the rate limiter immediately. */
public void enable() {
boolean wasEnabled = this.enabled.getAndSet(true);
if (!wasEnabled) {
logger.info("Rate limiter is enabled.");
}
}

public boolean isEnabled() {
return this.enabled.get();
}

public double getRate() {
return limiter.getRate();
}

// Update the rate after the QPS update period. Otherwise, no-op.

/**
* Sets the rate and the next rate update time based on period, if the current time exceeds the
* next rate update time. Otherwise, no-op.
*
* @param rate The new rate of the rate limiter.
* @param period The period during which rate should not be updated again and the rate limiter
* should not be disabled.
*/
public void trySetRate(double rate, Duration period) {
Instant nextTime = nextRateUpdateTime.get();
Instant now = Instant.now();

if (now.isBefore(nextTime)) {
return;
}

Instant newNextTime = now.plusSeconds(period.getSeconds());

if (!nextRateUpdateTime.compareAndSet(nextTime, newNextTime)) {
// Someone else updated it already.
return;
}
final double oldRate = limiter.getRate();
limiter.setRate(rate);
logger.info(
"Updated max rate from "
+ oldRate
+ " to "
+ rate
+ " with period "
+ period.getSeconds()
+ " seconds.");
}

@VisibleForTesting
void setEnabled(boolean enabled) {
this.enabled.set(enabled);
}

@VisibleForTesting
void setRate(double rate) {
limiter.setRate(rate);
}
}

class RateLimitingResponseObserver extends SafeResponseObserver<MutateRowsResponse> {
private final ResponseObserver<MutateRowsResponse> outerObserver;
private final RateLimiter rateLimiter;

private final AtomicReference<Instant> lastQpsChangeTime;
private final ResponseObserver<MutateRowsResponse> outerObserver;

RateLimitingResponseObserver(
RateLimiter rateLimiter,
AtomicReference<Instant> lastQpsChangeTime,
ResponseObserver<MutateRowsResponse> observer) {
RateLimitingResponseObserver(ResponseObserver<MutateRowsResponse> observer) {
super(observer);
this.outerObserver = observer;
this.rateLimiter = rateLimiter;
this.lastQpsChangeTime = lastQpsChangeTime;
}

@Override
protected void onStartImpl(StreamController controller) {
outerObserver.onStart(controller);
}

private boolean hasValidRateLimitInfo(MutateRowsResponse response) {
// RateLimitInfo is an optional field. However, proto3 sub-message field always
// have presence even thought it's marked as "optional". Check the factor and
// period to make sure they're not 0.
if (!response.hasRateLimitInfo()) {
logger.finest("Response carries no RateLimitInfo");
return false;
}

if (response.getRateLimitInfo().getFactor() <= 0
|| response.getRateLimitInfo().getPeriod().getSeconds() <= 0) {
logger.finest("Response carries invalid RateLimitInfo=" + response.getRateLimitInfo());
return false;
}

logger.finest("Response carries valid RateLimitInfo=" + response.getRateLimitInfo());
return true;
}

@Override
protected void onResponseImpl(MutateRowsResponse response) {
if (response.hasRateLimitInfo()) {
if (hasValidRateLimitInfo(response)) {
limiter.enable();
RateLimitInfo info = response.getRateLimitInfo();
// RateLimitInfo is an optional field. However, proto3 sub-message field always
// have presence even thought it's marked as "optional". Check the factor and
// period to make sure they're not 0.
if (info.getFactor() != 0 && info.getPeriod().getSeconds() != 0) {
updateQps(
info.getFactor(),
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())));
}
updateQps(
info.getFactor(),
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())));
} else {
limiter.tryDisable();
}
outerObserver.onResponse(response);
}
Expand All @@ -148,28 +265,35 @@ protected void onCompleteImpl() {
}

private void updateQps(double factor, Duration period) {
Instant lastTime = lastQpsChangeTime.get();
Instant now = Instant.now();

if (now.minus(period).isAfter(lastTime) && lastQpsChangeTime.compareAndSet(lastTime, now)) {
double cappedFactor = Math.min(Math.max(factor, MIN_FACTOR), MAX_FACTOR);
double currentRate = limiter.getRate();
limiter.setRate(Math.min(Math.max(currentRate * cappedFactor, MIN_QPS), MAX_QPS));
logger.log(
Level.FINE,
"Updated QPS from {0} to {1}, server returned factor is {2}, capped factor is {3}",
new Object[] {currentRate, limiter.getRate(), factor, cappedFactor});
}
double cappedFactor = Math.min(Math.max(factor, MIN_FACTOR), MAX_FACTOR);
double currentRate = limiter.getRate();
double cappedRate = Math.min(Math.max(currentRate * cappedFactor, MIN_QPS), MAX_QPS);
limiter.trySetRate(cappedRate, period);
}
}

@VisibleForTesting
AtomicReference<Instant> getLastQpsChangeTime() {
return lastQpsChangeTime;
AtomicReference<Instant> getNextRateUpdateTime() {
return limiter.nextRateUpdateTime;
}

@VisibleForTesting
double getCurrentRate() {
return limiter.getRate();
}

@VisibleForTesting
void setRate(double rate) {
limiter.setRate(rate);
}

@VisibleForTesting
boolean getLimiterEnabled() {
return limiter.isEnabled();
}

@VisibleForTesting
void setLimiterEnabled(boolean enabled) {
limiter.setEnabled(enabled);
}
}
Loading

0 comments on commit da5fb62

Please sign in to comment.