Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28359 Improve quota RateLimiter synchronization #5683

Merged
merged 4 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,32 @@ protected synchronized long getTimeUnitInMillis() {

/**
* Is there at least one resource available to allow execution?
* @return true if there is at least one resource available, otherwise false
* @return the waitInterval to backoff, or 0 if execution is allowed
*/
public boolean canExecute() {
public long canExecute() {
rmdmattingly marked this conversation as resolved.
Show resolved Hide resolved
return canExecute(1);
}

/**
* Are there enough available resources to allow execution?
* @param amount the number of required resources, a non-negative number
* @return the waitInterval to backoff, or 0 if execution is allowed
*/
public synchronized long canExecute(final long amount) {
rmdmattingly marked this conversation as resolved.
Show resolved Hide resolved
assert amount >= 0;
long waitInterval = waitInterval(amount);
rmdmattingly marked this conversation as resolved.
Show resolved Hide resolved
if (isAvailable(amount) || waitInterval == 0) {
return 0;
}
return waitInterval;
}

/**
* Are there enough available resources to allow execution?
* @param amount the number of required resources, a non-negative number
* @return true if there are enough available resources, otherwise false
*/
public synchronized boolean canExecute(final long amount) {
private synchronized boolean isAvailable(final long amount) {
rmdmattingly marked this conversation as resolved.
Show resolved Hide resolved
if (isBypass()) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,43 +141,47 @@ private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuot
public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit)
throws RpcThrottlingException {
if (!reqsLimiter.canExecute(writeReqs + readReqs)) {
RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
long waitInterval = reqsLimiter.canExecute(writeReqs + readReqs);
if (waitInterval > 0) {
RpcThrottlingException.throwNumRequestsExceeded(waitInterval);
}
if (!reqSizeLimiter.canExecute(estimateWriteSize + estimateReadSize)) {
RpcThrottlingException.throwRequestSizeExceeded(
reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize));
waitInterval = reqSizeLimiter.canExecute(estimateWriteSize + estimateReadSize);
if (waitInterval > 0) {
RpcThrottlingException.throwRequestSizeExceeded(waitInterval);
}
if (!reqCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit + estimateReadCapacityUnit)) {
RpcThrottlingException.throwRequestCapacityUnitExceeded(
reqCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit + estimateReadCapacityUnit));
waitInterval =
reqCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit + estimateReadCapacityUnit);
if (waitInterval > 0) {
RpcThrottlingException.throwRequestCapacityUnitExceeded(waitInterval);
}

if (estimateWriteSize > 0) {
if (!writeReqsLimiter.canExecute(writeReqs)) {
RpcThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval());
waitInterval = writeReqsLimiter.canExecute(writeReqs);
if (waitInterval > 0) {
RpcThrottlingException.throwNumWriteRequestsExceeded(waitInterval);
}
if (!writeSizeLimiter.canExecute(estimateWriteSize)) {
RpcThrottlingException
.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(estimateWriteSize));
waitInterval = writeSizeLimiter.canExecute(estimateWriteSize);
if (waitInterval > 0) {
RpcThrottlingException.throwWriteSizeExceeded(waitInterval);
}
if (!writeCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit)) {
RpcThrottlingException.throwWriteCapacityUnitExceeded(
writeCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit));
waitInterval = writeCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit);
if (waitInterval > 0) {
RpcThrottlingException.throwWriteCapacityUnitExceeded(waitInterval);
}
}

if (estimateReadSize > 0) {
if (!readReqsLimiter.canExecute(readReqs)) {
RpcThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval());
waitInterval = readReqsLimiter.canExecute(readReqs);
if (waitInterval > 0) {
RpcThrottlingException.throwNumReadRequestsExceeded(waitInterval);
}
if (!readSizeLimiter.canExecute(estimateReadSize)) {
RpcThrottlingException
.throwReadSizeExceeded(readSizeLimiter.waitInterval(estimateReadSize));
waitInterval = readSizeLimiter.canExecute(estimateReadSize);
if (waitInterval > 0) {
RpcThrottlingException.throwReadSizeExceeded(waitInterval);
}
if (!readCapacityUnitLimiter.canExecute(estimateReadCapacityUnit)) {
RpcThrottlingException.throwReadCapacityUnitExceeded(
readCapacityUnitLimiter.waitInterval(estimateReadCapacityUnit));
waitInterval = readCapacityUnitLimiter.canExecute(estimateReadCapacityUnit);
if (waitInterval > 0) {
RpcThrottlingException.throwReadCapacityUnitExceeded(waitInterval);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ public void testBBSGet() throws Exception {
doPuts(10_000, FAMILY, QUALIFIER, table);
TEST_UTIL.flush(TABLE_NAME);

// Add ~10 block/min limit
// Add ~10 block/sec limit
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_SIZE,
Math.round(10.1 * blockSize), TimeUnit.MINUTES));
Math.round(10.1 * blockSize), TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);

// should execute at max 10 requests
Expand All @@ -132,10 +132,10 @@ public void testBBSScan() throws Exception {
doPuts(10_000, FAMILY, QUALIFIER, table);
TEST_UTIL.flush(TABLE_NAME);

// Add 1 block/min limit.
// Add 1 block/sec limit.
// This should only allow 1 scan per minute, because we estimate 1 block per scan
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize,
TimeUnit.MINUTES));
TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
waitMinuteQuota();

Expand All @@ -148,9 +148,9 @@ public void testBBSScan() throws Exception {
testTraffic(() -> doScans(100, table), 100, 0);
testTraffic(() -> doScans(100, table), 100, 0);

// Add ~3 block/min limit. This should support >1 scans
// Add ~3 block/sec limit. This should support >1 scans
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
Math.round(3.1 * blockSize), TimeUnit.MINUTES));
Math.round(3.1 * blockSize), TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);

// should execute some requests, but not all
Expand All @@ -174,10 +174,10 @@ public void testBBSMultiGet() throws Exception {
doPuts(rowCount, FAMILY, QUALIFIER, table);
TEST_UTIL.flush(TABLE_NAME);

// Add 1 block/min limit.
// Add 1 block/sec limit.
// This should only allow 1 multiget per minute, because we estimate 1 block per multiget
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize,
TimeUnit.MINUTES));
TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
waitMinuteQuota();

Expand All @@ -190,9 +190,9 @@ public void testBBSMultiGet() throws Exception {
testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);

// Add ~100 block/min limit
// Add ~100 block/sec limit
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
Math.round(100.1 * blockSize), TimeUnit.MINUTES));
Math.round(100.1 * blockSize), TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);

// should execute approximately 10 batches of 10 requests
Expand All @@ -211,7 +211,7 @@ public void testBBSMultiGet() throws Exception {

private void testTraffic(Callable<Long> trafficCallable, long expectedSuccess, long marginOfError)
throws Exception {
TEST_UTIL.waitFor(90_000, () -> {
TEST_UTIL.waitFor(5_000, () -> {
long actualSuccess;
try {
actualSuccess = trafficCallable.call();
Expand Down
Loading