Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: rate limiting should be ineffective when RateLimitInfo is not present #2243

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,20 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.39.0')
implementation platform('com.google.cloud:libraries-bom:26.40.0')

implementation 'com.google.cloud:google-cloud-bigtable'
```
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigtable:2.39.3'
implementation 'com.google.cloud:google-cloud-bigtable:2.39.4'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigtable" % "2.39.3"
libraryDependencies += "com.google.cloud" % "google-cloud-bigtable" % "2.39.4"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -541,7 +541,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-bigtable/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-bigtable.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigtable/2.39.3
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigtable/2.39.4
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;

class RateLimitingServerStreamingCallable
extends ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> {

private static final Logger logger =
Logger.getLogger(RateLimitingServerStreamingCallable.class.getName());

Expand All @@ -64,16 +65,14 @@ class RateLimitingServerStreamingCallable
// as the server side cap
private static final double MAX_FACTOR = 1.3;

private final RateLimiter limiter;
private final ConditionalRateLimiter limiter;

private final AtomicReference<Instant> lastQpsChangeTime = new AtomicReference<>(Instant.now());
private final ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable;

RateLimitingServerStreamingCallable(
@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable) {
this.limiter = RateLimiter.create(DEFAULT_QPS);
this.limiter = new ConditionalRateLimiter(DEFAULT_QPS);
this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set");
logger.info("Rate limiting is enabled with initial QPS of " + limiter.getRate());
}

@Override
Expand All @@ -88,44 +87,158 @@ public void call(
((BigtableTracer) context.getTracer())
.batchRequestThrottled(stopwatch.elapsed(TimeUnit.NANOSECONDS));
}
RateLimitingResponseObserver innerObserver =
new RateLimitingResponseObserver(limiter, lastQpsChangeTime, responseObserver);
RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(responseObserver);
innerCallable.call(request, innerObserver, context);
}

/** A rate limiter wrapper class that can be disabled. */
static class ConditionalRateLimiter {
kongweihan marked this conversation as resolved.
Show resolved Hide resolved

private final AtomicBoolean enabled = new AtomicBoolean(false);

private final RateLimiter limiter;

// This is the next time allowed to change QPS or disable rate limiting.
private final AtomicReference<Instant> nextRateUpdateTime =
new AtomicReference<>(Instant.now());

public ConditionalRateLimiter(long defaultQps) {
limiter = RateLimiter.create(defaultQps);
logger.info("Rate limiting is initiated (but disabled) with rate of " + defaultQps + " QPS.");
}

/**
* Works the same way with {@link RateLimiter#acquire()} except that when the rate limiter is
* disabled, {@link ConditionalRateLimiter#acquire()} always returns immediately.
*/
public void acquire() {
if (enabled.get()) {
limiter.acquire();
}
}

/**
* Disables the rate limier if the current time exceeded the next rate update time. When
* disabled, the rate is retained and will be re-used if re-enabled later.
*/
public void tryDisable() {
// Only disable after the rate update time.
Instant nextTime = nextRateUpdateTime.get();
Instant now = Instant.now();
if (now.isAfter(nextTime)) {
boolean wasEnabled = this.enabled.getAndSet(false);
if (wasEnabled) {
logger.info("Rate limiter is disabled.");
}
// No need to update nextRateUpdateTime, any new RateLimitInfo can enable rate limiting and
// update the rate again.
}
}

/** Enables the rate limiter immediately. */
public void enable() {
boolean wasEnabled = this.enabled.getAndSet(true);
if (!wasEnabled) {
logger.info("Rate limiter is enabled.");
}
}

public boolean isEnabled() {
return this.enabled.get();
}

public double getRate() {
return limiter.getRate();
}

/**
* Sets the rate and the next rate update time based on period, if the current time exceeds the
* next rate update time. Otherwise, no-op.
*
* @param rate The new rate of the rate limiter.
* @param period The period during which rate should not be updated again and the rate limiter
* should not be disabled.
*/
public void trySetRate(double rate, Duration period) {
Instant nextTime = nextRateUpdateTime.get();
Instant now = Instant.now();

if (now.isBefore(nextTime)) {
return;
}

Instant newNextTime = now.plusSeconds(period.getSeconds());

if (!nextRateUpdateTime.compareAndSet(nextTime, newNextTime)) {
// Someone else updated it already.
return;
}
final double oldRate = limiter.getRate();
limiter.setRate(rate);
logger.info(
"Updated max rate from "
+ oldRate
+ " to "
+ rate
+ " with period "
+ period.getSeconds()
+ " seconds.");
}

@VisibleForTesting
void setEnabled(boolean enabled) {
this.enabled.set(enabled);
}

@VisibleForTesting
void setRate(double rate) {
limiter.setRate(rate);
}
}

class RateLimitingResponseObserver extends SafeResponseObserver<MutateRowsResponse> {
private final ResponseObserver<MutateRowsResponse> outerObserver;
private final RateLimiter rateLimiter;

private final AtomicReference<Instant> lastQpsChangeTime;
private final ResponseObserver<MutateRowsResponse> outerObserver;

RateLimitingResponseObserver(
RateLimiter rateLimiter,
AtomicReference<Instant> lastQpsChangeTime,
ResponseObserver<MutateRowsResponse> observer) {
RateLimitingResponseObserver(ResponseObserver<MutateRowsResponse> observer) {
super(observer);
this.outerObserver = observer;
this.rateLimiter = rateLimiter;
this.lastQpsChangeTime = lastQpsChangeTime;
}

@Override
protected void onStartImpl(StreamController controller) {
outerObserver.onStart(controller);
}

private boolean hasValidRateLimitInfo(MutateRowsResponse response) {
// RateLimitInfo is an optional field. However, proto3 sub-message field always
// have presence even thought it's marked as "optional". Check the factor and
// period to make sure they're not 0.
if (!response.hasRateLimitInfo()) {
logger.finest("Response carries no RateLimitInfo");
return false;
}

if (response.getRateLimitInfo().getFactor() <= 0
|| response.getRateLimitInfo().getPeriod().getSeconds() <= 0) {
logger.finest("Response carries invalid RateLimitInfo=" + response.getRateLimitInfo());
return false;
}

logger.finest("Response carries valid RateLimitInfo=" + response.getRateLimitInfo());
return true;
}

@Override
protected void onResponseImpl(MutateRowsResponse response) {
if (response.hasRateLimitInfo()) {
if (hasValidRateLimitInfo(response)) {
limiter.enable();
RateLimitInfo info = response.getRateLimitInfo();
// RateLimitInfo is an optional field. However, proto3 sub-message field always
// have presence even thought it's marked as "optional". Check the factor and
// period to make sure they're not 0.
if (info.getFactor() != 0 && info.getPeriod().getSeconds() != 0) {
updateQps(
info.getFactor(),
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())));
}
updateQps(
info.getFactor(),
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())));
} else {
limiter.tryDisable();
}
outerObserver.onResponse(response);
}
Expand All @@ -148,28 +261,35 @@ protected void onCompleteImpl() {
}

private void updateQps(double factor, Duration period) {
Instant lastTime = lastQpsChangeTime.get();
Instant now = Instant.now();

if (now.minus(period).isAfter(lastTime) && lastQpsChangeTime.compareAndSet(lastTime, now)) {
double cappedFactor = Math.min(Math.max(factor, MIN_FACTOR), MAX_FACTOR);
double currentRate = limiter.getRate();
limiter.setRate(Math.min(Math.max(currentRate * cappedFactor, MIN_QPS), MAX_QPS));
logger.log(
Level.FINE,
"Updated QPS from {0} to {1}, server returned factor is {2}, capped factor is {3}",
new Object[] {currentRate, limiter.getRate(), factor, cappedFactor});
}
double cappedFactor = Math.min(Math.max(factor, MIN_FACTOR), MAX_FACTOR);
double currentRate = limiter.getRate();
double cappedRate = Math.min(Math.max(currentRate * cappedFactor, MIN_QPS), MAX_QPS);
limiter.trySetRate(cappedRate, period);
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}
}

@VisibleForTesting
AtomicReference<Instant> getLastQpsChangeTime() {
return lastQpsChangeTime;
AtomicReference<Instant> getNextRateUpdateTime() {
return limiter.nextRateUpdateTime;
}

@VisibleForTesting
double getCurrentRate() {
return limiter.getRate();
}

@VisibleForTesting
void setRate(double rate) {
limiter.setRate(rate);
}

@VisibleForTesting
boolean getLimiterEnabled() {
return limiter.isEnabled();
}

@VisibleForTesting
void setLimiterEnabled(boolean enabled) {
limiter.setEnabled(enabled);
}
}
Loading
Loading