Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport "HBASE-28672 Ensure large batches are not indefinitely blocked by quotas (#6003)" to branch-3 #6057

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,16 @@ private void checkQuota(long numWrites, long numReads) throws RpcThrottlingExcep
continue;
}

limiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed,
writeCapacityUnitConsumed, readCapacityUnitConsumed);
long maxRequestsToEstimate = limiter.getRequestNumLimit();
long maxReadsToEstimate = Math.min(maxRequestsToEstimate, limiter.getReadNumLimit());
long maxWritesToEstimate = Math.min(maxRequestsToEstimate, limiter.getWriteNumLimit());
long maxReadSizeToEstimate = Math.min(readConsumed, limiter.getReadLimit());
long maxWriteSizeToEstimate = Math.min(writeConsumed, limiter.getWriteLimit());

limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites),
Math.min(maxWriteSizeToEstimate, writeConsumed), Math.min(maxReadsToEstimate, numReads),
Math.min(maxReadSizeToEstimate, readConsumed), writeCapacityUnitConsumed,
readCapacityUnitConsumed);
readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ public long getWriteAvailable() {
throw new UnsupportedOperationException();
}

@Override
public long getRequestNumLimit() {
return Long.MAX_VALUE;
}

@Override
public long getReadNumLimit() {
return Long.MAX_VALUE;
}

@Override
public long getWriteNumLimit() {
return Long.MAX_VALUE;
}

@Override
public long getReadAvailable() {
throw new UnsupportedOperationException();
Expand All @@ -75,6 +90,11 @@ public long getReadLimit() {
return Long.MAX_VALUE;
}

@Override
public long getWriteLimit() {
return Long.MAX_VALUE;
}

@Override
public String toString() {
return "NoopQuotaLimiter";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
/** Returns the maximum number of bytes ever available to read */
long getReadLimit();

/** Returns the maximum number of bytes ever available to write */
long getWriteLimit();

/** Returns the number of bytes available to write to avoid exceeding the quota */
long getWriteAvailable();

/** Returns the maximum number of requests to allow per TimeUnit */
long getRequestNumLimit();

/** Returns the maximum number of reads to allow per TimeUnit */
long getReadNumLimit();

/** Returns the maximum number of writes to allow per TimeUnit */
long getWriteNumLimit();

}
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,27 @@ public long getWriteAvailable() {
return writeSizeLimiter.getAvailable();
}

@Override
public long getRequestNumLimit() {
long readAndWriteLimit = readReqsLimiter.getLimit() + writeReqsLimiter.getLimit();

if (readAndWriteLimit < 0) { // handle overflow
readAndWriteLimit = Long.MAX_VALUE;
}

return Math.min(reqsLimiter.getLimit(), readAndWriteLimit);
}

@Override
public long getReadNumLimit() {
return readReqsLimiter.getLimit();
}

@Override
public long getWriteNumLimit() {
return writeReqsLimiter.getLimit();
}

@Override
public long getReadAvailable() {
return readSizeLimiter.getAvailable();
Expand All @@ -250,6 +271,11 @@ public long getReadLimit() {
return Math.min(readSizeLimiter.getLimit(), reqSizeLimiter.getLimit());
}

@Override
public long getWriteLimit() {
return Math.min(writeSizeLimiter.getLimit(), reqSizeLimiter.getLimit());
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,37 @@
package org.apache.hadoop.hbase.quotas;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;

@Category({ RegionServerTests.class, SmallTests.class })
public class TestDefaultOperationQuota {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestDefaultOperationQuota.class);

private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge();
static {
envEdge.setValue(EnvironmentEdgeManager.currentTime());
// only active the envEdge for quotas package
EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge,
ThrottleQuotaTestUtil.class.getPackage().getName());
}

@Test
public void testScanEstimateNewScanner() {
long blockSize = 64 * 1024;
Expand Down Expand Up @@ -125,4 +141,155 @@ public void testScanEstimateShrinkingWorkload() {
// shrinking workload should only shrink estimate to maxBBS
assertEquals(maxBlockBytesScanned, estimate);
}

@Test
public void testLargeBatchSaturatesReadNumLimit()
throws RpcThrottlingException, InterruptedException {
int limit = 10;
QuotaProtos.Throttle throttle =
QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);

// use the whole limit
quota.checkBatchQuota(0, limit);

// the next request should be rejected
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1));

envEdge.incValue(1000);
// after the TimeUnit, the limit should be refilled
quota.checkBatchQuota(0, limit);
}

@Test
public void testLargeBatchSaturatesReadWriteLimit()
throws RpcThrottlingException, InterruptedException {
int limit = 10;
QuotaProtos.Throttle throttle =
QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);

// use the whole limit
quota.checkBatchQuota(limit, 0);

// the next request should be rejected
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0));

envEdge.incValue(1000);
// after the TimeUnit, the limit should be refilled
quota.checkBatchQuota(limit, 0);
}

@Test
public void testTooLargeReadBatchIsNotBlocked()
throws RpcThrottlingException, InterruptedException {
int limit = 10;
QuotaProtos.Throttle throttle =
QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);

// use more than the limit, which should succeed rather than being indefinitely blocked
quota.checkBatchQuota(0, 10 + limit);

// the next request should be blocked
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1));

envEdge.incValue(1000);
// even after the TimeUnit, the limit should not be refilled because we oversubscribed
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit));
}

@Test
public void testTooLargeWriteBatchIsNotBlocked()
throws RpcThrottlingException, InterruptedException {
int limit = 10;
QuotaProtos.Throttle throttle =
QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);

// use more than the limit, which should succeed rather than being indefinitely blocked
quota.checkBatchQuota(10 + limit, 0);

// the next request should be blocked
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0));

envEdge.incValue(1000);
// even after the TimeUnit, the limit should not be refilled because we oversubscribed
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0));
}

@Test
public void testTooLargeWriteSizeIsNotBlocked()
throws RpcThrottlingException, InterruptedException {
int limit = 50;
QuotaProtos.Throttle throttle =
QuotaProtos.Throttle.newBuilder().setWriteSize(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);

// writes are estimated a 100 bytes, so this will use 2x the limit but should not be blocked
quota.checkBatchQuota(1, 0);

// the next request should be blocked
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0));

envEdge.incValue(1000);
// even after the TimeUnit, the limit should not be refilled because we oversubscribed
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0));
}

@Test
public void testTooLargeReadSizeIsNotBlocked()
throws RpcThrottlingException, InterruptedException {
long blockSize = 65536;
long limit = blockSize / 2;
QuotaProtos.Throttle throttle =
QuotaProtos.Throttle.newBuilder().setReadSize(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
DefaultOperationQuota quota =
new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter);

// reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked
quota.checkBatchQuota(0, 1);

// the next request should be blocked
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1));

envEdge.incValue(1000);
// even after the TimeUnit, the limit should not be refilled because we oversubscribed
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1));
}

@Test
public void testTooLargeRequestSizeIsNotBlocked()
throws RpcThrottlingException, InterruptedException {
long blockSize = 65536;
long limit = blockSize / 2;
QuotaProtos.Throttle throttle =
QuotaProtos.Throttle.newBuilder().setReqSize(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
DefaultOperationQuota quota =
new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter);

// reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked
quota.checkBatchQuota(0, 1);

// the next request should be blocked
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1));

envEdge.incValue(1000);
// even after the TimeUnit, the limit should not be refilled because we oversubscribed
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1));
}
}