diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java index 97cc2f73ec..3115510adc 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java @@ -31,6 +31,7 @@ import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -64,16 +65,17 @@ class RateLimitingServerStreamingCallable // as the server side cap private static final double MAX_FACTOR = 1.3; - private final RateLimiter limiter; + private final ConditionalRateLimiter limiter; private final AtomicReference lastQpsChangeTime = new AtomicReference<>(Instant.now()); private final ServerStreamingCallable innerCallable; RateLimitingServerStreamingCallable( @Nonnull ServerStreamingCallable innerCallable) { - this.limiter = RateLimiter.create(DEFAULT_QPS); + this.limiter = new ConditionalRateLimiter(DEFAULT_QPS); this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set"); - logger.info("Rate limiting is enabled with initial QPS of " + limiter.getRate()); + logger.info("BatchWriteFlowControl: Rate limiting callable is initiated with QPS of " + + limiter.getRate()); } @Override @@ -93,14 +95,57 @@ public void call( innerCallable.call(request, innerObserver, context); } + class ConditionalRateLimiter { + + private AtomicBoolean enabled = new AtomicBoolean(true); + + private final RateLimiter limiter; + + public ConditionalRateLimiter(long defaultQps) { + limiter = RateLimiter.create(defaultQps); + } + + public double acquire() { + if (enabled.get()) { + return limiter.acquire(); + } else { + return 0; + } + } + + public void setEnabled(boolean enabled) { + boolean wasEnabled = this.enabled.getAndSet(enabled); + if (wasEnabled && !enabled) { + logger.fine("BatchWriteFlowControl: rate limiter disabled."); + } + if (!wasEnabled && enabled) { + logger.fine("BatchWriteFlowControl: rate limiter enabled."); + } + } + + public boolean isEnabled() { + return this.enabled.get(); + } + + public double getRate() { + return limiter.getRate(); + } + + public void setRate(double rate) { + logger.fine( + "BatchWriteFlowControl: updated rate from " + limiter.getRate() + " to " + rate); + limiter.setRate(rate); + } + } + class RateLimitingResponseObserver extends SafeResponseObserver { private final ResponseObserver outerObserver; - private final RateLimiter rateLimiter; + private final ConditionalRateLimiter rateLimiter; private final AtomicReference lastQpsChangeTime; RateLimitingResponseObserver( - RateLimiter rateLimiter, + ConditionalRateLimiter rateLimiter, AtomicReference lastQpsChangeTime, ResponseObserver observer) { super(observer); @@ -116,7 +161,10 @@ protected void onStartImpl(StreamController controller) { @Override protected void onResponseImpl(MutateRowsResponse response) { + logger.fine( + "BatchWriteFlowControl: response contains RateLimitInfo=" + response.hasRateLimitInfo()); if (response.hasRateLimitInfo()) { + limiter.setEnabled(true); RateLimitInfo info = response.getRateLimitInfo(); // RateLimitInfo is an optional field. However, proto3 sub-message field always // have presence even thought it's marked as "optional". Check the factor and @@ -126,6 +174,8 @@ protected void onResponseImpl(MutateRowsResponse response) { info.getFactor(), Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod()))); } + } else { + limiter.setEnabled(false); } outerObserver.onResponse(response); } @@ -158,7 +208,7 @@ private void updateQps(double factor, Duration period) { logger.log( Level.FINE, "Updated QPS from {0} to {1}, server returned factor is {2}, capped factor is {3}", - new Object[] {currentRate, limiter.getRate(), factor, cappedFactor}); + new Object[]{currentRate, limiter.getRate(), factor, cappedFactor}); } } }