diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index 2e26765a6a19..a387c04e4e51 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -111,8 +111,16 @@ private void checkQuota(long numWrites, long numReads) throws RpcThrottlingExcep continue; } - limiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed, - writeCapacityUnitConsumed, readCapacityUnitConsumed); + long maxRequestsToEstimate = limiter.getRequestNumLimit(); + long maxReadsToEstimate = Math.min(maxRequestsToEstimate, limiter.getReadNumLimit()); + long maxWritesToEstimate = Math.min(maxRequestsToEstimate, limiter.getWriteNumLimit()); + long maxReadSizeToEstimate = Math.min(readConsumed, limiter.getReadLimit()); + long maxWriteSizeToEstimate = Math.min(writeConsumed, limiter.getWriteLimit()); + + limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites), + Math.min(maxWriteSizeToEstimate, writeConsumed), Math.min(maxReadsToEstimate, numReads), + Math.min(maxReadSizeToEstimate, readConsumed), writeCapacityUnitConsumed, + readCapacityUnitConsumed); readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java index cf1e49c12e5c..5ece0be2b5aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java @@ -65,6 +65,21 @@ public long getWriteAvailable() { throw new UnsupportedOperationException(); } + @Override + public long getRequestNumLimit() { + return Long.MAX_VALUE; + } + + @Override + public long getReadNumLimit() { + return Long.MAX_VALUE; + } + + @Override + public long getWriteNumLimit() { + return Long.MAX_VALUE; + } + @Override public long getReadAvailable() { throw new UnsupportedOperationException(); @@ -75,6 +90,11 @@ public long getReadLimit() { return Long.MAX_VALUE; } + @Override + public long getWriteLimit() { + return Long.MAX_VALUE; + } + @Override public String toString() { return "NoopQuotaLimiter"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java index 8d00a702e253..12e4c4a7c6a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java @@ -79,6 +79,19 @@ void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, /** Returns the maximum number of bytes ever available to read */ long getReadLimit(); + /** Returns the maximum number of bytes ever available to write */ + long getWriteLimit(); + /** Returns the number of bytes available to write to avoid exceeding the quota */ long getWriteAvailable(); + + /** Returns the maximum number of requests to allow per TimeUnit */ + long getRequestNumLimit(); + + /** Returns the maximum number of reads to allow per TimeUnit */ + long getReadNumLimit(); + + /** Returns the maximum number of writes to allow per TimeUnit */ + long getWriteNumLimit(); + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index e6e143343f72..f5170b09c83e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -240,6 +240,27 @@ public long getWriteAvailable() { return writeSizeLimiter.getAvailable(); } + @Override + public long getRequestNumLimit() { + long readAndWriteLimit = readReqsLimiter.getLimit() + writeReqsLimiter.getLimit(); + + if (readAndWriteLimit < 0) { // handle overflow + readAndWriteLimit = Long.MAX_VALUE; + } + + return Math.min(reqsLimiter.getLimit(), readAndWriteLimit); + } + + @Override + public long getReadNumLimit() { + return readReqsLimiter.getLimit(); + } + + @Override + public long getWriteNumLimit() { + return writeReqsLimiter.getLimit(); + } + @Override public long getReadAvailable() { return readSizeLimiter.getAvailable(); @@ -250,6 +271,11 @@ public long getReadLimit() { return Math.min(readSizeLimiter.getLimit(), reqSizeLimiter.getLimit()); } + @Override + public long getWriteLimit() { + return Math.min(writeSizeLimiter.getLimit(), reqSizeLimiter.getLimit()); + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java index 4684be02d69d..a6b7ba6fee59 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java @@ -18,21 +18,37 @@ package org.apache.hadoop.hbase.quotas; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; + @Category({ RegionServerTests.class, SmallTests.class }) public class TestDefaultOperationQuota { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestDefaultOperationQuota.class); + private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge(); + static { + envEdge.setValue(EnvironmentEdgeManager.currentTime()); + // only active the envEdge for quotas package + EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge, + ThrottleQuotaTestUtil.class.getPackage().getName()); + } + @Test public void testScanEstimateNewScanner() { long blockSize = 64 * 1024; @@ -125,4 +141,155 @@ public void testScanEstimateShrinkingWorkload() { // shrinking workload should only shrink estimate to maxBBS assertEquals(maxBlockBytesScanned, estimate); } + + @Test + public void testLargeBatchSaturatesReadNumLimit() + throws RpcThrottlingException, InterruptedException { + int limit = 10; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + + // use the whole limit + quota.checkBatchQuota(0, limit); + + // the next request should be rejected + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + + envEdge.incValue(1000); + // after the TimeUnit, the limit should be refilled + quota.checkBatchQuota(0, limit); + } + + @Test + public void testLargeBatchSaturatesReadWriteLimit() + throws RpcThrottlingException, InterruptedException { + int limit = 10; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + + // use the whole limit + quota.checkBatchQuota(limit, 0); + + // the next request should be rejected + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); + + envEdge.incValue(1000); + // after the TimeUnit, the limit should be refilled + quota.checkBatchQuota(limit, 0); + } + + @Test + public void testTooLargeReadBatchIsNotBlocked() + throws RpcThrottlingException, InterruptedException { + int limit = 10; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + + // use more than the limit, which should succeed rather than being indefinitely blocked + quota.checkBatchQuota(0, 10 + limit); + + // the next request should be blocked + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + + envEdge.incValue(1000); + // even after the TimeUnit, the limit should not be refilled because we oversubscribed + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit)); + } + + @Test + public void testTooLargeWriteBatchIsNotBlocked() + throws RpcThrottlingException, InterruptedException { + int limit = 10; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + + // use more than the limit, which should succeed rather than being indefinitely blocked + quota.checkBatchQuota(10 + limit, 0); + + // the next request should be blocked + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); + + envEdge.incValue(1000); + // even after the TimeUnit, the limit should not be refilled because we oversubscribed + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0)); + } + + @Test + public void testTooLargeWriteSizeIsNotBlocked() + throws RpcThrottlingException, InterruptedException { + int limit = 50; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setWriteSize(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + + // writes are estimated a 100 bytes, so this will use 2x the limit but should not be blocked + quota.checkBatchQuota(1, 0); + + // the next request should be blocked + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); + + envEdge.incValue(1000); + // even after the TimeUnit, the limit should not be refilled because we oversubscribed + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0)); + } + + @Test + public void testTooLargeReadSizeIsNotBlocked() + throws RpcThrottlingException, InterruptedException { + long blockSize = 65536; + long limit = blockSize / 2; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setReadSize(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = + new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter); + + // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked + quota.checkBatchQuota(0, 1); + + // the next request should be blocked + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + + envEdge.incValue(1000); + // even after the TimeUnit, the limit should not be refilled because we oversubscribed + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1)); + } + + @Test + public void testTooLargeRequestSizeIsNotBlocked() + throws RpcThrottlingException, InterruptedException { + long blockSize = 65536; + long limit = blockSize / 2; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setReqSize(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = + new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter); + + // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked + quota.checkBatchQuota(0, 1); + + // the next request should be blocked + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + + envEdge.incValue(1000); + // even after the TimeUnit, the limit should not be refilled because we oversubscribed + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1)); + } }