Skip to content

Commit

Permalink
HBASE-28359 Improve quota RateLimiter synchronization (#5683)
Browse files Browse the repository at this point in the history
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
rmdmattingly authored and bbeaudreault committed Mar 7, 2024
1 parent bf9a8af commit c7eb3d8
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ public long getReadAvailable() {
return readAvailable;
}

@Override
public long getReadConsumed() {
return readConsumed;
}

@Override
public void addGetResult(final Result result) {
operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ public void addMutation(final Mutation mutation) {
public long getReadAvailable() {
return Long.MAX_VALUE;
}

@Override
public long getReadConsumed() {
return 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,15 @@ public enum OperationType {

/** Returns the number of bytes available to read to avoid exceeding the quota */
long getReadAvailable();

/** Returns the number of bytes consumed from the quota by the operation */
long getReadConsumed();

/**
* Returns the maximum result size to be returned by the given operation. This is the greater of
* two numbers: the bytes available, or the bytes already consumed
*/
default long getMaxResultSize() {
return Math.max(getReadAvailable(), getReadConsumed());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@

/**
* Simple rate limiter. Usage Example: // At this point you have a unlimited resource limiter
* RateLimiter limiter = new AverageIntervalRateLimiter(); or new FixedIntervalRateLimiter();
* limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec while (true) { // call canExecute
* before performing resource consuming operation bool canExecute = limiter.canExecute(); // If
* there are no available resources, wait until one is available if (!canExecute)
* Thread.sleep(limiter.waitInterval()); // ...execute the work and consume the resource...
* limiter.consume(); }
* RateLimiter limiter = new AverageIntervalRateLimiter(); // or new FixedIntervalRateLimiter();
* limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec while (limiter.getWaitIntervalMs > 0)
* { // wait until waitInterval == 0 Thread.sleep(limiter.getWaitIntervalMs()); } // ...execute the
* work and consume the resource... limiter.consume();
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
Expand Down Expand Up @@ -135,18 +133,31 @@ protected synchronized long getTimeUnitInMillis() {

/**
* Is there at least one resource available to allow execution?
* @return true if there is at least one resource available, otherwise false
* @return the waitInterval to backoff, or 0 if execution is allowed
*/
public boolean canExecute() {
return canExecute(1);
public long getWaitIntervalMs() {
return getWaitIntervalMs(1);
}

/**
* Are there enough available resources to allow execution?
* @param amount the number of required resources, a non-negative number
* @return the waitInterval to backoff, or 0 if execution is allowed
*/
public synchronized long getWaitIntervalMs(final long amount) {
assert amount >= 0;
if (!isAvailable(amount)) {
return waitInterval(amount);
}
return 0;
}

/**
* Are there enough available resources to allow execution?
* @param amount the number of required resources, a non-negative number
* @return true if there are enough available resources, otherwise false
*/
public synchronized boolean canExecute(final long amount) {
private boolean isAvailable(final long amount) {
if (isBypass()) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,43 +141,47 @@ private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuot
public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit)
throws RpcThrottlingException {
if (!reqsLimiter.canExecute(writeReqs + readReqs)) {
RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
long waitInterval = reqsLimiter.getWaitIntervalMs(writeReqs + readReqs);
if (waitInterval > 0) {
RpcThrottlingException.throwNumRequestsExceeded(waitInterval);
}
if (!reqSizeLimiter.canExecute(estimateWriteSize + estimateReadSize)) {
RpcThrottlingException.throwRequestSizeExceeded(
reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize));
waitInterval = reqSizeLimiter.getWaitIntervalMs(estimateWriteSize + estimateReadSize);
if (waitInterval > 0) {
RpcThrottlingException.throwRequestSizeExceeded(waitInterval);
}
if (!reqCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit + estimateReadCapacityUnit)) {
RpcThrottlingException.throwRequestCapacityUnitExceeded(
reqCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit + estimateReadCapacityUnit));
waitInterval = reqCapacityUnitLimiter
.getWaitIntervalMs(estimateWriteCapacityUnit + estimateReadCapacityUnit);
if (waitInterval > 0) {
RpcThrottlingException.throwRequestCapacityUnitExceeded(waitInterval);
}

if (estimateWriteSize > 0) {
if (!writeReqsLimiter.canExecute(writeReqs)) {
RpcThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval());
waitInterval = writeReqsLimiter.getWaitIntervalMs(writeReqs);
if (waitInterval > 0) {
RpcThrottlingException.throwNumWriteRequestsExceeded(waitInterval);
}
if (!writeSizeLimiter.canExecute(estimateWriteSize)) {
RpcThrottlingException
.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(estimateWriteSize));
waitInterval = writeSizeLimiter.getWaitIntervalMs(estimateWriteSize);
if (waitInterval > 0) {
RpcThrottlingException.throwWriteSizeExceeded(waitInterval);
}
if (!writeCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit)) {
RpcThrottlingException.throwWriteCapacityUnitExceeded(
writeCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit));
waitInterval = writeCapacityUnitLimiter.getWaitIntervalMs(estimateWriteCapacityUnit);
if (waitInterval > 0) {
RpcThrottlingException.throwWriteCapacityUnitExceeded(waitInterval);
}
}

if (estimateReadSize > 0) {
if (!readReqsLimiter.canExecute(readReqs)) {
RpcThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval());
waitInterval = readReqsLimiter.getWaitIntervalMs(readReqs);
if (waitInterval > 0) {
RpcThrottlingException.throwNumReadRequestsExceeded(waitInterval);
}
if (!readSizeLimiter.canExecute(estimateReadSize)) {
RpcThrottlingException
.throwReadSizeExceeded(readSizeLimiter.waitInterval(estimateReadSize));
waitInterval = readSizeLimiter.getWaitIntervalMs(estimateReadSize);
if (waitInterval > 0) {
RpcThrottlingException.throwReadSizeExceeded(waitInterval);
}
if (!readCapacityUnitLimiter.canExecute(estimateReadCapacityUnit)) {
RpcThrottlingException.throwReadCapacityUnitExceeded(
readCapacityUnitLimiter.waitInterval(estimateReadCapacityUnit));
waitInterval = readCapacityUnitLimiter.getWaitIntervalMs(estimateReadCapacityUnit);
if (waitInterval > 0) {
RpcThrottlingException.throwReadCapacityUnitExceeded(waitInterval);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
// doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are
// deferred/batched
List<ClientProtos.Action> mutations = null;
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
IOException sizeIOE = null;
ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
ResultOrException.newBuilder();
Expand Down Expand Up @@ -3639,7 +3639,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
}
RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
// now let's do the real scan.
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
RegionScanner scanner = rsh.s;
// this is the limit of rows for this scan, if we the number of rows reach this value, we will
// close the scanner.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ public void testBBSGet() throws Exception {
doPuts(10_000, FAMILY, QUALIFIER, table);
TEST_UTIL.flush(TABLE_NAME);

// Add ~10 block/min limit
// Add ~10 block/sec limit
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_SIZE,
Math.round(10.1 * blockSize), TimeUnit.MINUTES));
Math.round(10.1 * blockSize), TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);

// should execute at max 10 requests
Expand All @@ -132,10 +132,10 @@ public void testBBSScan() throws Exception {
doPuts(10_000, FAMILY, QUALIFIER, table);
TEST_UTIL.flush(TABLE_NAME);

// Add 1 block/min limit.
// Add 1 block/sec limit.
// This should only allow 1 scan per minute, because we estimate 1 block per scan
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize,
TimeUnit.MINUTES));
TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
waitMinuteQuota();

Expand All @@ -148,9 +148,9 @@ public void testBBSScan() throws Exception {
testTraffic(() -> doScans(100, table), 100, 0);
testTraffic(() -> doScans(100, table), 100, 0);

// Add ~3 block/min limit. This should support >1 scans
// Add ~3 block/sec limit. This should support >1 scans
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
Math.round(3.1 * blockSize), TimeUnit.MINUTES));
Math.round(3.1 * blockSize), TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);

// should execute some requests, but not all
Expand All @@ -174,10 +174,10 @@ public void testBBSMultiGet() throws Exception {
doPuts(rowCount, FAMILY, QUALIFIER, table);
TEST_UTIL.flush(TABLE_NAME);

// Add 1 block/min limit.
// Add 1 block/sec limit.
// This should only allow 1 multiget per minute, because we estimate 1 block per multiget
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize,
TimeUnit.MINUTES));
TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
waitMinuteQuota();

Expand All @@ -190,9 +190,9 @@ public void testBBSMultiGet() throws Exception {
testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);

// Add ~100 block/min limit
// Add ~100 block/sec limit
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
Math.round(100.1 * blockSize), TimeUnit.MINUTES));
Math.round(100.1 * blockSize), TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);

// should execute approximately 10 batches of 10 requests
Expand All @@ -211,7 +211,7 @@ public void testBBSMultiGet() throws Exception {

private void testTraffic(Callable<Long> trafficCallable, long expectedSuccess, long marginOfError)
throws Exception {
TEST_UTIL.waitFor(90_000, () -> {
TEST_UTIL.waitFor(5_000, () -> {
long actualSuccess;
try {
actualSuccess = trafficCallable.call();
Expand Down
Loading

0 comments on commit c7eb3d8

Please sign in to comment.