Skip to content

Commit

Permalink
HBASE-28672 Ensure large batches are not indefinitely blocked by quot…
Browse files Browse the repository at this point in the history
…as (#6003)

Co-authored-by: Ray Mattingly <rmattingly@hubspot.com>
Signed-off-by: Bryan Beaudreault < bbeaudreault@apache.org>
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
  • Loading branch information
2 people authored and ndimiduk committed Jul 10, 2024
1 parent 282f0c0 commit f741873
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,16 @@ private void checkQuota(long numWrites, long numReads) throws RpcThrottlingExcep
continue;
}

limiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed,
writeCapacityUnitConsumed, readCapacityUnitConsumed);
long maxRequestsToEstimate = limiter.getRequestNumLimit();
long maxReadsToEstimate = Math.min(maxRequestsToEstimate, limiter.getReadNumLimit());
long maxWritesToEstimate = Math.min(maxRequestsToEstimate, limiter.getWriteNumLimit());
long maxReadSizeToEstimate = Math.min(readConsumed, limiter.getReadLimit());
long maxWriteSizeToEstimate = Math.min(writeConsumed, limiter.getWriteLimit());

limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites),
Math.min(maxWriteSizeToEstimate, writeConsumed), Math.min(maxReadsToEstimate, numReads),
Math.min(maxReadSizeToEstimate, readConsumed), writeCapacityUnitConsumed,
readCapacityUnitConsumed);
readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ public long getWriteAvailable() {
throw new UnsupportedOperationException();
}

@Override
public long getRequestNumLimit() {
return Long.MAX_VALUE;
}

@Override
public long getReadNumLimit() {
return Long.MAX_VALUE;
}

@Override
public long getWriteNumLimit() {
return Long.MAX_VALUE;
}

@Override
public long getReadAvailable() {
throw new UnsupportedOperationException();
Expand All @@ -75,6 +90,11 @@ public long getReadLimit() {
return Long.MAX_VALUE;
}

@Override
public long getWriteLimit() {
return Long.MAX_VALUE;
}

@Override
public String toString() {
return "NoopQuotaLimiter";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
/** Returns the maximum number of bytes ever available to read */
long getReadLimit();

/** Returns the maximum number of bytes ever available to write */
long getWriteLimit();

/** Returns the number of bytes available to write to avoid exceeding the quota */
long getWriteAvailable();

/** Returns the maximum number of requests to allow per TimeUnit */
long getRequestNumLimit();

/** Returns the maximum number of reads to allow per TimeUnit */
long getReadNumLimit();

/** Returns the maximum number of writes to allow per TimeUnit */
long getWriteNumLimit();

}
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,27 @@ public long getWriteAvailable() {
return writeSizeLimiter.getAvailable();
}

@Override
public long getRequestNumLimit() {
long readAndWriteLimit = readReqsLimiter.getLimit() + writeReqsLimiter.getLimit();

if (readAndWriteLimit < 0) { // handle overflow
readAndWriteLimit = Long.MAX_VALUE;
}

return Math.min(reqsLimiter.getLimit(), readAndWriteLimit);
}

@Override
public long getReadNumLimit() {
return readReqsLimiter.getLimit();
}

@Override
public long getWriteNumLimit() {
return writeReqsLimiter.getLimit();
}

@Override
public long getReadAvailable() {
return readSizeLimiter.getAvailable();
Expand All @@ -250,6 +271,11 @@ public long getReadLimit() {
return Math.min(readSizeLimiter.getLimit(), reqSizeLimiter.getLimit());
}

@Override
public long getWriteLimit() {
return Math.min(writeSizeLimiter.getLimit(), reqSizeLimiter.getLimit());
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,37 @@
package org.apache.hadoop.hbase.quotas;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;

@Category({ RegionServerTests.class, SmallTests.class })
public class TestDefaultOperationQuota {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestDefaultOperationQuota.class);

private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge();
static {
envEdge.setValue(EnvironmentEdgeManager.currentTime());
// only active the envEdge for quotas package
EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge,
ThrottleQuotaTestUtil.class.getPackage().getName());
}

@Test
public void testScanEstimateNewScanner() {
long blockSize = 64 * 1024;
Expand Down Expand Up @@ -125,4 +141,155 @@ public void testScanEstimateShrinkingWorkload() {
// shrinking workload should only shrink estimate to maxBBS
assertEquals(maxBlockBytesScanned, estimate);
}

@Test
public void testLargeBatchSaturatesReadNumLimit()
throws RpcThrottlingException, InterruptedException {
int limit = 10;
QuotaProtos.Throttle throttle =
QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);

// use the whole limit
quota.checkBatchQuota(0, limit);

// the next request should be rejected
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1));

envEdge.incValue(1000);
// after the TimeUnit, the limit should be refilled
quota.checkBatchQuota(0, limit);
}

@Test
public void testLargeBatchSaturatesReadWriteLimit()
throws RpcThrottlingException, InterruptedException {
int limit = 10;
QuotaProtos.Throttle throttle =
QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);

// use the whole limit
quota.checkBatchQuota(limit, 0);

// the next request should be rejected
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0));

envEdge.incValue(1000);
// after the TimeUnit, the limit should be refilled
quota.checkBatchQuota(limit, 0);
}

@Test
public void testTooLargeReadBatchIsNotBlocked()
throws RpcThrottlingException, InterruptedException {
int limit = 10;
QuotaProtos.Throttle throttle =
QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);

// use more than the limit, which should succeed rather than being indefinitely blocked
quota.checkBatchQuota(0, 10 + limit);

// the next request should be blocked
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1));

envEdge.incValue(1000);
// even after the TimeUnit, the limit should not be refilled because we oversubscribed
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit));
}

@Test
public void testTooLargeWriteBatchIsNotBlocked()
throws RpcThrottlingException, InterruptedException {
int limit = 10;
QuotaProtos.Throttle throttle =
QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);

// use more than the limit, which should succeed rather than being indefinitely blocked
quota.checkBatchQuota(10 + limit, 0);

// the next request should be blocked
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0));

envEdge.incValue(1000);
// even after the TimeUnit, the limit should not be refilled because we oversubscribed
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0));
}

@Test
public void testTooLargeWriteSizeIsNotBlocked()
throws RpcThrottlingException, InterruptedException {
int limit = 50;
QuotaProtos.Throttle throttle =
QuotaProtos.Throttle.newBuilder().setWriteSize(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);

// writes are estimated a 100 bytes, so this will use 2x the limit but should not be blocked
quota.checkBatchQuota(1, 0);

// the next request should be blocked
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0));

envEdge.incValue(1000);
// even after the TimeUnit, the limit should not be refilled because we oversubscribed
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0));
}

@Test
public void testTooLargeReadSizeIsNotBlocked()
throws RpcThrottlingException, InterruptedException {
long blockSize = 65536;
long limit = blockSize / 2;
QuotaProtos.Throttle throttle =
QuotaProtos.Throttle.newBuilder().setReadSize(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
DefaultOperationQuota quota =
new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter);

// reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked
quota.checkBatchQuota(0, 1);

// the next request should be blocked
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1));

envEdge.incValue(1000);
// even after the TimeUnit, the limit should not be refilled because we oversubscribed
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1));
}

@Test
public void testTooLargeRequestSizeIsNotBlocked()
throws RpcThrottlingException, InterruptedException {
long blockSize = 65536;
long limit = blockSize / 2;
QuotaProtos.Throttle throttle =
QuotaProtos.Throttle.newBuilder().setReqSize(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
DefaultOperationQuota quota =
new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter);

// reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked
quota.checkBatchQuota(0, 1);

// the next request should be blocked
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1));

envEdge.incValue(1000);
// even after the TimeUnit, the limit should not be refilled because we oversubscribed
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1));
}
}

0 comments on commit f741873

Please sign in to comment.