diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java index 97cc2f73ec..141c5916f0 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java @@ -31,8 +31,8 @@ import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nonnull; import org.threeten.bp.Duration; @@ -40,6 +40,7 @@ class RateLimitingServerStreamingCallable extends ServerStreamingCallable { + private static final Logger logger = Logger.getLogger(RateLimitingServerStreamingCallable.class.getName()); @@ -64,16 +65,14 @@ class RateLimitingServerStreamingCallable // as the server side cap private static final double MAX_FACTOR = 1.3; - private final RateLimiter limiter; + private final ConditionalRateLimiter limiter; - private final AtomicReference lastQpsChangeTime = new AtomicReference<>(Instant.now()); private final ServerStreamingCallable innerCallable; RateLimitingServerStreamingCallable( @Nonnull ServerStreamingCallable innerCallable) { - this.limiter = RateLimiter.create(DEFAULT_QPS); + this.limiter = new ConditionalRateLimiter(DEFAULT_QPS); this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set"); - logger.info("Rate limiting is enabled with initial QPS of " + limiter.getRate()); } @Override @@ -88,25 +87,126 @@ public void call( ((BigtableTracer) context.getTracer()) .batchRequestThrottled(stopwatch.elapsed(TimeUnit.NANOSECONDS)); } - RateLimitingResponseObserver innerObserver = - new RateLimitingResponseObserver(limiter, lastQpsChangeTime, responseObserver); + RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(responseObserver); innerCallable.call(request, innerObserver, context); } + /** A rate limiter wrapper class that can be disabled. */ + static class ConditionalRateLimiter { + + private final AtomicBoolean enabled = new AtomicBoolean(false); + + private final RateLimiter limiter; + + // This is the next time allowed to change QPS or disable rate limiting. + private final AtomicReference nextRateUpdateTime = + new AtomicReference<>(Instant.now()); + + public ConditionalRateLimiter(long defaultQps) { + limiter = RateLimiter.create(defaultQps); + logger.info("Rate limiting is initiated (but disabled) with rate of " + defaultQps + " QPS."); + } + + /** + * Works the same way with {@link RateLimiter#acquire()} except that when the rate limiter is + * disabled, {@link ConditionalRateLimiter#acquire()} always returns immediately. + */ + public void acquire() { + if (enabled.get()) { + limiter.acquire(); + } + } + + // Enable rate limiting immediately or disable after the QPS update period. Otherwise, no-op. + + /** + * Disables the rate limier if the current time exceeded the next rate update time. When + * disabled, the rate is retained and will be re-used if re-enabled later. + */ + public void tryDisable() { + // Only disable after the rate update time. + Instant nextTime = nextRateUpdateTime.get(); + Instant now = Instant.now(); + if (now.isAfter(nextTime)) { + boolean wasEnabled = this.enabled.getAndSet(false); + if (wasEnabled) { + logger.info("Rate limiter is disabled."); + } + // No need to update nextRateUpdateTime, any new RateLimitInfo can enable rate limiting and + // update the rate again. + } + } + + /** Enables the rate limiter immediately. */ + public void enable() { + boolean wasEnabled = this.enabled.getAndSet(true); + if (!wasEnabled) { + logger.info("Rate limiter is enabled."); + } + } + + public boolean isEnabled() { + return this.enabled.get(); + } + + public double getRate() { + return limiter.getRate(); + } + + // Update the rate after the QPS update period. Otherwise, no-op. + + /** + * Sets the rate and the next rate update time based on period, if the current time exceeds the + * next rate update time. Otherwise, no-op. + * + * @param rate The new rate of the rate limiter. + * @param period The period during which rate should not be updated again and the rate limiter + * should not be disabled. + */ + public void trySetRate(double rate, Duration period) { + Instant nextTime = nextRateUpdateTime.get(); + Instant now = Instant.now(); + + if (now.isBefore(nextTime)) { + return; + } + + Instant newNextTime = now.plusSeconds(period.getSeconds()); + + if (!nextRateUpdateTime.compareAndSet(nextTime, newNextTime)) { + // Someone else updated it already. + return; + } + final double oldRate = limiter.getRate(); + limiter.setRate(rate); + logger.info( + "Updated max rate from " + + oldRate + + " to " + + rate + + " with period " + + period.getSeconds() + + " seconds."); + } + + @VisibleForTesting + void setEnabled(boolean enabled) { + this.enabled.set(enabled); + } + + @VisibleForTesting + void setRate(double rate) { + limiter.setRate(rate); + } + } + class RateLimitingResponseObserver extends SafeResponseObserver { - private final ResponseObserver outerObserver; - private final RateLimiter rateLimiter; - private final AtomicReference lastQpsChangeTime; + private final ResponseObserver outerObserver; - RateLimitingResponseObserver( - RateLimiter rateLimiter, - AtomicReference lastQpsChangeTime, - ResponseObserver observer) { + RateLimitingResponseObserver(ResponseObserver observer) { super(observer); this.outerObserver = observer; - this.rateLimiter = rateLimiter; - this.lastQpsChangeTime = lastQpsChangeTime; } @Override @@ -114,18 +214,35 @@ protected void onStartImpl(StreamController controller) { outerObserver.onStart(controller); } + private boolean hasValidRateLimitInfo(MutateRowsResponse response) { + // RateLimitInfo is an optional field. However, proto3 sub-message field always + // have presence even thought it's marked as "optional". Check the factor and + // period to make sure they're not 0. + if (!response.hasRateLimitInfo()) { + logger.finest("Response carries no RateLimitInfo"); + return false; + } + + if (response.getRateLimitInfo().getFactor() <= 0 + || response.getRateLimitInfo().getPeriod().getSeconds() <= 0) { + logger.finest("Response carries invalid RateLimitInfo=" + response.getRateLimitInfo()); + return false; + } + + logger.finest("Response carries valid RateLimitInfo=" + response.getRateLimitInfo()); + return true; + } + @Override protected void onResponseImpl(MutateRowsResponse response) { - if (response.hasRateLimitInfo()) { + if (hasValidRateLimitInfo(response)) { + limiter.enable(); RateLimitInfo info = response.getRateLimitInfo(); - // RateLimitInfo is an optional field. However, proto3 sub-message field always - // have presence even thought it's marked as "optional". Check the factor and - // period to make sure they're not 0. - if (info.getFactor() != 0 && info.getPeriod().getSeconds() != 0) { - updateQps( - info.getFactor(), - Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod()))); - } + updateQps( + info.getFactor(), + Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod()))); + } else { + limiter.tryDisable(); } outerObserver.onResponse(response); } @@ -148,28 +265,35 @@ protected void onCompleteImpl() { } private void updateQps(double factor, Duration period) { - Instant lastTime = lastQpsChangeTime.get(); - Instant now = Instant.now(); - - if (now.minus(period).isAfter(lastTime) && lastQpsChangeTime.compareAndSet(lastTime, now)) { - double cappedFactor = Math.min(Math.max(factor, MIN_FACTOR), MAX_FACTOR); - double currentRate = limiter.getRate(); - limiter.setRate(Math.min(Math.max(currentRate * cappedFactor, MIN_QPS), MAX_QPS)); - logger.log( - Level.FINE, - "Updated QPS from {0} to {1}, server returned factor is {2}, capped factor is {3}", - new Object[] {currentRate, limiter.getRate(), factor, cappedFactor}); - } + double cappedFactor = Math.min(Math.max(factor, MIN_FACTOR), MAX_FACTOR); + double currentRate = limiter.getRate(); + double cappedRate = Math.min(Math.max(currentRate * cappedFactor, MIN_QPS), MAX_QPS); + limiter.trySetRate(cappedRate, period); } } @VisibleForTesting - AtomicReference getLastQpsChangeTime() { - return lastQpsChangeTime; + AtomicReference getNextRateUpdateTime() { + return limiter.nextRateUpdateTime; } @VisibleForTesting double getCurrentRate() { return limiter.getRate(); } + + @VisibleForTesting + void setRate(double rate) { + limiter.setRate(rate); + } + + @VisibleForTesting + boolean getLimiterEnabled() { + return limiter.isEnabled(); + } + + @VisibleForTesting + void setLimiterEnabled(boolean enabled) { + limiter.setEnabled(enabled); + } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingCallableTest.java index 92b93cfafe..f2fe77725d 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingCallableTest.java @@ -17,6 +17,8 @@ package com.google.cloud.bigtable.data.v2.stub; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.rpc.ApiCallContext; @@ -59,21 +61,90 @@ public void setup() throws Exception { } @Test - public void testWithRateLimitInfo() throws Exception { + public void testDefaultSettingOnInitiate() throws Exception { callableToTest.call(request, responseObserver, context); + assertFalse(callableToTest.getLimiterEnabled()); + assertThat(callableToTest.getCurrentRate()).isEqualTo(10); + } + + @Test + public void testUpdateRate() throws Exception { + callableToTest.call(request, responseObserver, context); + callableToTest.setLimiterEnabled(true); Instant earlier = Instant.now().minus(org.threeten.bp.Duration.ofHours(1)); - // make sure QPS will be updated - callableToTest.getLastQpsChangeTime().set(earlier); + // Make sure rate will be updated. + callableToTest.getNextRateUpdateTime().set(earlier); double oldQps = callableToTest.getCurrentRate(); double factor = 0.8; + int periodSeconds = 10; + + RateLimitInfo info = + RateLimitInfo.newBuilder() + .setFactor(factor) + .setPeriod(Duration.newBuilder().setSeconds(periodSeconds).build()) + .build(); + + MutateRowsResponse response = MutateRowsResponse.newBuilder().setRateLimitInfo(info).build(); + + innerCallable.getObserver().onResponse(response); + + // Give the thread some time to update the rate. + Thread.sleep(100); + double newQps = callableToTest.getCurrentRate(); + + assertThat(newQps).isWithin(0.01).of(oldQps * factor); + + innerCallable.getObserver().onComplete(); + } + + @Test + public void testNoRateLimitInfoDoesNotUpdateRate() throws Exception { + callableToTest.call(request, responseObserver, context); + callableToTest.setLimiterEnabled(true); + + Instant earlier = Instant.now().minus(org.threeten.bp.Duration.ofHours(1)); + + // Make sure rate will be updated. + callableToTest.getNextRateUpdateTime().set(earlier); + double oldQps = callableToTest.getCurrentRate(); + + // A response without RateLimitInfo. + MutateRowsResponse response = MutateRowsResponse.newBuilder().build(); + + innerCallable.getObserver().onResponse(response); + + // Give the thread some time to update the rate. + Thread.sleep(100); + double newQps = callableToTest.getCurrentRate(); + + assertThat(newQps).isEqualTo(oldQps); // No change + assertFalse(callableToTest.getLimiterEnabled()); // Rate limiter is also disabled. + + innerCallable.getObserver().onComplete(); + } + + @Test + public void testInvalidRateLimitInfoDoesNotUpdateRate() throws Exception { + callableToTest.call(request, responseObserver, context); + callableToTest.setLimiterEnabled(true); + + Instant earlier = Instant.now().minus(org.threeten.bp.Duration.ofHours(1)); + + // make sure QPS will be updated + callableToTest.getNextRateUpdateTime().set(earlier); + double oldQps = callableToTest.getCurrentRate(); + + // A response with invalid RateLimitInfo. + double factor = 0; // Invalid factor + int periodSeconds = 10; RateLimitInfo info = RateLimitInfo.newBuilder() .setFactor(factor) - .setPeriod(Duration.newBuilder().setSeconds(10).build()) + .setPeriod(Duration.newBuilder().setSeconds(periodSeconds).build()) .build(); MutateRowsResponse response = MutateRowsResponse.newBuilder().setRateLimitInfo(info).build(); @@ -84,37 +155,132 @@ public void testWithRateLimitInfo() throws Exception { Thread.sleep(100); double newQps = callableToTest.getCurrentRate(); - assertThat(newQps).isWithin(0.1).of(oldQps * factor); + assertThat(newQps).isEqualTo(oldQps); // No change + assertFalse(callableToTest.getLimiterEnabled()); // Rate limiter is also disabled. + + innerCallable.getObserver().onComplete(); + } + + @Test + public void testMissingRateLimitInfoFactorDoesNotUpdateRate() throws Exception { + callableToTest.call(request, responseObserver, context); + callableToTest.setLimiterEnabled(true); + + Instant earlier = Instant.now().minus(org.threeten.bp.Duration.ofHours(1)); + + // Make sure rate can be updated. + callableToTest.getNextRateUpdateTime().set(earlier); + double oldQps = callableToTest.getCurrentRate(); + + // A response with invalid RateLimitInfo. + // Missing factor is equivalent to 0. + int periodSeconds = 10; + RateLimitInfo info = + RateLimitInfo.newBuilder() + .setPeriod(Duration.newBuilder().setSeconds(periodSeconds).build()) + .build(); + + MutateRowsResponse response = MutateRowsResponse.newBuilder().setRateLimitInfo(info).build(); + + innerCallable.getObserver().onResponse(response); + + // Give the thread some time to update the rate. + Thread.sleep(100); + double newQps = callableToTest.getCurrentRate(); + + assertThat(newQps).isEqualTo(oldQps); // No change + assertFalse(callableToTest.getLimiterEnabled()); // Rate limiter is also disabled. innerCallable.getObserver().onComplete(); } @Test - public void testNoUpdateWithinPeriod() throws Exception { + public void testNoUpdateBeforeAllowedTime() throws Exception { callableToTest.call(request, responseObserver, context); + callableToTest.setLimiterEnabled(true); - Instant now = Instant.now(); - // make sure QPS will not be updated - callableToTest.getLastQpsChangeTime().set(now); + Instant later = Instant.now().plus(org.threeten.bp.Duration.ofHours(1)); + // Make sure rate will not be updated. + callableToTest.getNextRateUpdateTime().set(later); double oldQps = callableToTest.getCurrentRate(); double factor = 0.3; + int periodSeconds = 10; RateLimitInfo info = RateLimitInfo.newBuilder() .setFactor(factor) - .setPeriod(Duration.newBuilder().setSeconds(600).build()) + .setPeriod(Duration.newBuilder().setSeconds(periodSeconds).build()) .build(); MutateRowsResponse response = MutateRowsResponse.newBuilder().setRateLimitInfo(info).build(); innerCallable.getObserver().onResponse(response); - // Give the thread sometime to update the QPS + // Give the thread some time to update the rate. + Thread.sleep(100); + double newQps = callableToTest.getCurrentRate(); + + assertThat(newQps).isEqualTo(oldQps); // No change. + assertTrue(callableToTest.getLimiterEnabled()); // Still enabled. + + innerCallable.getObserver().onComplete(); + } + + @Test + public void testDoesNotDisableBeforeAllowedTime() throws Exception { + callableToTest.call(request, responseObserver, context); + callableToTest.setLimiterEnabled(true); + + Instant later = Instant.now().plus(org.threeten.bp.Duration.ofHours(1)); + // Make sure limiter will not be disabled. + callableToTest.getNextRateUpdateTime().set(later); + double oldQps = callableToTest.getCurrentRate(); + + // Missing RateLimitInfo disables rate limiting. + MutateRowsResponse response = MutateRowsResponse.newBuilder().build(); + + innerCallable.getObserver().onResponse(response); + + // Give the thread sometime to disable the rate limiter. + Thread.sleep(100); + double newQps = callableToTest.getCurrentRate(); + + assertThat(newQps).isEqualTo(oldQps); // No change on QPS. + assertTrue(callableToTest.getLimiterEnabled()); // Still enabled. + + innerCallable.getObserver().onComplete(); + } + + @Test + public void testEnableWithinPeriodDoesNotUpdateRate() throws Exception { + callableToTest.call(request, responseObserver, context); + callableToTest.setRate(1.5); + + Instant later = Instant.now().plus(org.threeten.bp.Duration.ofHours(1)); + // Even though the rate update time is far in the future, enable is always allowed. + callableToTest.getNextRateUpdateTime().set(later); + double oldQps = callableToTest.getCurrentRate(); + + double factor = 0.3; + int periodSeconds = 600; + + RateLimitInfo info = + RateLimitInfo.newBuilder() + .setFactor(factor) + .setPeriod(Duration.newBuilder().setSeconds(periodSeconds).build()) + .build(); + + MutateRowsResponse response = MutateRowsResponse.newBuilder().setRateLimitInfo(info).build(); + + innerCallable.getObserver().onResponse(response); + + // Give the thread some time to enable the rate limiter. Thread.sleep(100); double newQps = callableToTest.getCurrentRate(); - assertThat(newQps).isEqualTo(oldQps); + assertThat(newQps).isEqualTo(oldQps); // No change on QPS due to QPS update time. + assertTrue(callableToTest.getLimiterEnabled()); // Rate limiting is enabled. innerCallable.getObserver().onComplete(); } @@ -126,7 +292,7 @@ public void testErrorInfoLowerQPS() throws Exception { Instant earlier = Instant.now().minus(org.threeten.bp.Duration.ofHours(1)); // make sure QPS will be updated - callableToTest.getLastQpsChangeTime().set(earlier); + callableToTest.getNextRateUpdateTime().set(earlier); double oldQps = callableToTest.getCurrentRate(); innerCallable