From 2c083b7b53ddb8a7074a30391456adebc0d917f5 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Wed, 24 Jan 2024 14:22:39 -0500 Subject: [PATCH 1/7] Consume block bytes scanned in operation quota --- .../hbase/quotas/DefaultOperationQuota.java | 38 ++- .../hbase/quotas/ExceedOperationQuota.java | 6 +- .../hbase/quotas/NoopOperationQuota.java | 5 + .../hadoop/hbase/quotas/OperationQuota.java | 10 + .../quotas/RegionServerRpcQuotaManager.java | 20 +- .../hadoop/hbase/regionserver/HRegion.java | 12 + .../hbase/regionserver/RSRpcServices.java | 9 + .../hadoop/hbase/regionserver/Region.java | 6 + .../quotas/TestBlockBytesScannedQuota.java | 220 ++++++++++++++++++ .../hbase/quotas/ThrottleQuotaTestUtil.java | 63 +++++ 10 files changed, 372 insertions(+), 17 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index ddf804243ed8..f8a877579c79 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -49,9 +49,16 @@ public class DefaultOperationQuota implements OperationQuota { protected long readDiff = 0; protected long writeCapacityUnitDiff = 0; protected long readCapacityUnitDiff = 0; + private long blockBytesScanned = 0; + private boolean useBlockBytesScanned; + private long blockSizeBytes; - public DefaultOperationQuota(final Configuration conf, final QuotaLimiter... limiters) { + public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes, + final QuotaLimiter... limiters) { this(conf, Arrays.asList(limiters)); + this.useBlockBytesScanned = + conf.getBoolean(OperationQuota.USE_BLOCK_BYTES_SCANNED_KEY, USE_BLOCK_BYTES_SCANNED_DEFAULT); + this.blockSizeBytes = blockSizeBytes; } /** @@ -94,8 +101,15 @@ public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThro public void close() { // Adjust the quota consumed for the specified operation writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed; - readDiff = operationSize[OperationType.GET.ordinal()] - + operationSize[OperationType.SCAN.ordinal()] - readConsumed; + + if (useBlockBytesScanned) { + readDiff = blockBytesScanned - readConsumed; + } else { + long resultSize = + operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()]; + readDiff = resultSize - readConsumed; + } + writeCapacityUnitDiff = calculateWriteCapacityUnitDiff(operationSize[OperationType.MUTATE.ordinal()], writeConsumed); readCapacityUnitDiff = calculateReadCapacityUnitDiff( @@ -132,6 +146,11 @@ public void addMutation(final Mutation mutation) { operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation); } + @Override + public void addBlockBytesScanned(long blockBytesScanned) { + this.blockBytesScanned += blockBytesScanned; + } + /** * Update estimate quota(read/write size/capacityUnits) which will be consumed * @param numWrites the number of write requests @@ -139,9 +158,16 @@ public void addMutation(final Mutation mutation) { * @param numScans the number of scan requests */ protected void updateEstimateConsumeQuota(int numWrites, int numReads, int numScans) { - writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); - readConsumed = estimateConsume(OperationType.GET, numReads, 100); - readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000); + if (useBlockBytesScanned) { + writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); + // assume 1 block required for reads. this is probably a low estimate, which is okay + readConsumed = numReads > 0 ? blockSizeBytes : 0; + readConsumed += numScans > 0 ? blockSizeBytes : 0; + } else { + writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); + readConsumed = estimateConsume(OperationType.GET, numReads, 100); + readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000); + } writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed); readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java index 1b7200f5f22f..1788e550f22a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java @@ -40,9 +40,9 @@ public class ExceedOperationQuota extends DefaultOperationQuota { private static final Logger LOG = LoggerFactory.getLogger(ExceedOperationQuota.class); private QuotaLimiter regionServerLimiter; - public ExceedOperationQuota(final Configuration conf, QuotaLimiter regionServerLimiter, - final QuotaLimiter... limiters) { - super(conf, limiters); + public ExceedOperationQuota(final Configuration conf, int blockSizeBytes, + QuotaLimiter regionServerLimiter, final QuotaLimiter... limiters) { + super(conf, blockSizeBytes, limiters); this.regionServerLimiter = regionServerLimiter; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java index 71fc169d671f..178d459a1e8f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java @@ -64,6 +64,11 @@ public void addMutation(final Mutation mutation) { // no-op } + @Override + public void addBlockBytesScanned(long blockBytesScanned) { + // no-op + } + @Override public long getReadAvailable() { return Long.MAX_VALUE; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java index aaae64b6184a..e67e48e73386 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java @@ -35,6 +35,9 @@ public enum OperationType { SCAN } + String USE_BLOCK_BYTES_SCANNED_KEY = "hbase.quota.use.block.bytes.scanned"; + boolean USE_BLOCK_BYTES_SCANNED_DEFAULT = false; + /** * Checks if it is possible to execute the specified operation. The quota will be estimated based * on the number of operations to perform and the average size accumulated during time. @@ -67,6 +70,13 @@ public enum OperationType { */ void addMutation(Mutation mutation); + /** + * Add the block bytes scanned for the given call. This may be used to calculate the exact quota, + * and can be a better representation of workload than result sizes. Set + * {@link #USE_BLOCK_BYTES_SCANNED_KEY} to true to prefer this metric over result size. + */ + void addBlockBytesScanned(long blockBytesScanned); + /** Returns the number of bytes available to read to avoid exceeding the quota */ long getReadAvailable(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index 4b09c0308f9e..de76303e27ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Optional; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.ipc.RpcScheduler; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.regionserver.Region; @@ -113,7 +114,8 @@ QuotaCache getQuotaCache() { * @param table the table where the operation will be executed * @return the OperationQuota */ - public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) { + public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table, + final int blockSizeBytes) { if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) { UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi); QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table); @@ -123,7 +125,8 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter); } if (!useNoop) { - return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter); + return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes, + userLimiter); } } else { QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString()); @@ -139,11 +142,11 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t } if (!useNoop) { if (exceedThrottleQuotaEnabled) { - return new ExceedOperationQuota(this.rsServices.getConfiguration(), rsLimiter, - userLimiter, tableLimiter, nsLimiter); + return new ExceedOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes, + rsLimiter, userLimiter, tableLimiter, nsLimiter); } else { - return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter, - tableLimiter, nsLimiter, rsLimiter); + return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes, + userLimiter, tableLimiter, nsLimiter, rsLimiter); } } } @@ -213,9 +216,10 @@ private OperationQuota checkQuota(final Region region, final int numWrites, fina } else { ugi = User.getCurrent().getUGI(); } - TableName table = region.getTableDescriptor().getTableName(); + TableDescriptor tableDescriptor = region.getTableDescriptor(); + TableName table = tableDescriptor.getTableName(); - OperationQuota quota = getQuota(ugi, table); + OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes()); try { quota.checkQuota(numWrites, numReads, numScans); } catch (RpcThrottlingException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 0dc96747dd36..643baeb68150 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -458,6 +458,8 @@ public MetricsTableRequests getMetricsTableRequests() { private final CellComparator cellComparator; + private int minBlockSizeBytes = -1; + /** * @return The smallest mvcc readPoint across all the scanners in this region. Writes older than * this readPoint, are included in every read operation. @@ -2047,6 +2049,16 @@ public Configuration getReadOnlyConfiguration() { return new ReadOnlyConfiguration(this.conf); } + @Override + public int getMinBlockSizeBytes() { + if (minBlockSizeBytes > 0) { + return minBlockSizeBytes; + } + minBlockSizeBytes = Arrays.stream(this.htableDescriptor.getColumnFamilies()) + .mapToInt(ColumnFamilyDescriptor::getBlocksize).min().orElse(HConstants.DEFAULT_BLOCKSIZE); + return minBlockSizeBytes; + } + private ThreadPoolExecutor getStoreOpenAndCloseThreadPool(final String threadNamePrefix) { int numStores = Math.max(1, this.htableDescriptor.getColumnFamilyCount()); int maxThreads = Math.min(numStores, conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 0fe6f6476a6c..7eacfd73398a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -690,6 +690,7 @@ private Result increment(final HRegion region, final OperationQuota quota, if (metricsRegionServer != null) { long blockBytesScanned = context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; + quota.addBlockBytesScanned(blockBytesScanned); metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before, blockBytesScanned); } @@ -2506,6 +2507,9 @@ public GetResponse get(final RpcController controller, final GetRequest request) if (r != null && r.rawCells() != null) { quota.addGetResult(r); } + if (context != null) { + quota.addBlockBytesScanned(context.getBlockBytesScanned()); + } return builder.build(); } catch (IOException ie) { throw new ServiceException(ie); @@ -2841,6 +2845,9 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request) spaceQuotaEnforcement); } } finally { + if (context != null) { + quota.addBlockBytesScanned(context.getBlockBytesScanned()); + } quota.close(); } @@ -3041,6 +3048,7 @@ private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota long after = EnvironmentEdgeManager.currentTime(); long blockBytesScanned = context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; + quota.addBlockBytesScanned(blockBytesScanned); metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned); MutationType type = mutation.getMutateType(); @@ -3645,6 +3653,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque } quota.addScanResult(results); + quota.addBlockBytesScanned(rpcCall.getBlockBytesScanned()); addResults(builder, results, (HBaseRpcController) controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), isClientCellBlockSupport(rpcCall)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 6a897a5b9f36..42069e58092e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -571,4 +571,10 @@ void requestCompaction(byte[] family, String why, int priority, boolean major, * if you try to set a configuration. */ Configuration getReadOnlyConfiguration(); + + /** + * The minimum block size configuration from all relevant column families. This is used when + * estimating quota consumption. + */ + int getMinBlockSizeBytes(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java new file mode 100644 index 000000000000..4f47fb6918c2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doGets; +import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doMultiGets; +import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts; +import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doScans; +import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh; +import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestBlockBytesScannedQuota { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBlockBytesScannedQuota.class); + + private final static Logger LOG = LoggerFactory.getLogger(TestBlockBytesScannedQuota.class); + + private static final int REFRESH_TIME = 5000; + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final byte[] FAMILY = Bytes.toBytes("cf"); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + + private static final TableName TABLE_NAME = TableName.valueOf("BlockBytesScannedQuotaTest"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // client should fail fast + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 10); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + + // quotas enabled, using block bytes scanned + TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME); + TEST_UTIL.getConfiguration().setBoolean(OperationQuota.USE_BLOCK_BYTES_SCANNED_KEY, true); + + // don't cache blocks to make IO predictable + TEST_UTIL.getConfiguration().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); + + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + QuotaCache.TEST_FORCE_REFRESH = true; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + EnvironmentEdgeManager.reset(); + TEST_UTIL.deleteTable(TABLE_NAME); + TEST_UTIL.shutdownMiniCluster(); + } + + @After + public void tearDown() throws Exception { + ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL); + } + + @Test + public void testBBSGet() throws Exception { + final Admin admin = TEST_UTIL.getAdmin(); + final String userName = User.getCurrent().getShortName(); + int blockSize = admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize(); + Table table = admin.getConnection().getTable(TABLE_NAME); + + doPuts(10_000, FAMILY, QUALIFIER, table); + TEST_UTIL.flush(TABLE_NAME); + + // Add ~10 block/min limit + admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, + Math.round(10.1 * blockSize), TimeUnit.MINUTES)); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + + // should execute at max 10 requests + assertEquals(10, doGets(20, FAMILY, QUALIFIER, table)); + + // wait a minute and you should get another 10 requests executed + waitMinuteQuota(); + assertEquals(10, doGets(20, FAMILY, QUALIFIER, table)); + + // Remove all the limits + admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); + triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); + assertEquals(100, doGets(100, FAMILY, QUALIFIER, table)); + assertEquals(100, doGets(100, FAMILY, QUALIFIER, table)); + } + + @Test + public void testBBSScan() throws Exception { + final Admin admin = TEST_UTIL.getAdmin(); + final String userName = User.getCurrent().getShortName(); + int blockSize = admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize(); + Table table = admin.getConnection().getTable(TABLE_NAME); + + doPuts(10_000, FAMILY, QUALIFIER, table); + TEST_UTIL.flush(TABLE_NAME); + + // Add 1 block/min limit. + // This should only allow 1 scan per minute, because we estimate 1 block per scan + admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize, + TimeUnit.MINUTES)); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + waitMinuteQuota(); + + // should execute 1 request + assertEquals(1, doScans(5, table)); + + // Remove all the limits + admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); + triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); + assertEquals(100, doScans(100, table)); + assertEquals(100, doScans(100, table)); + + // Add ~3 block/min limit. This should support >0 scans + admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, + Math.round(3.1 * blockSize), TimeUnit.MINUTES)); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + + // should execute some requests, but not all + long successfulScans = doScans(100, table); + LOG.info("successfulScans = " + successfulScans); + assertTrue(successfulScans < 100); + assertTrue(successfulScans > 0); + + // Remove all the limits + admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); + triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); + assertEquals(100, doScans(100, table)); + assertEquals(100, doScans(100, table)); + } + + @Test + public void testBBSMultiGet() throws Exception { + final Admin admin = TEST_UTIL.getAdmin(); + final String userName = User.getCurrent().getShortName(); + int blockSize = admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize(); + Table table = admin.getConnection().getTable(TABLE_NAME); + int rowCount = 10_000; + + doPuts(rowCount, FAMILY, QUALIFIER, table); + TEST_UTIL.flush(TABLE_NAME); + + // Add 1 block/min limit. + // This should only allow 1 multiget per minute, because we estimate 1 block per scan + admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize, + TimeUnit.MINUTES)); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + waitMinuteQuota(); + + // should execute 1 request + assertEquals(1, doMultiGets(10, 10, rowCount, FAMILY, QUALIFIER, table)); + + // Remove all the limits + admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); + triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); + assertEquals(100, doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table)); + assertEquals(100, doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table)); + + // Add 100 block/min limit + admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, + Math.round(100 * blockSize), TimeUnit.MINUTES)); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + + // should execute approximately 10 batches of 10 requests + long successfulMultiGets = doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table); + assertTrue(successfulMultiGets >= 9); + assertTrue(successfulMultiGets <= 11); + + // wait a minute and you should get another ~10 batches of 10 requests + waitMinuteQuota(); + successfulMultiGets = doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table); + assertTrue(successfulMultiGets >= 9); + assertTrue(successfulMultiGets <= 11); + + // Remove all the limits + admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); + triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); + assertEquals(100, doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table)); + assertEquals(100, doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java index de6f5653ad2c..bc2d0ae0713e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java @@ -18,12 +18,17 @@ package org.apache.hadoop.hbase.quotas; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; +import java.util.Random; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; @@ -105,6 +110,64 @@ static long doGets(int maxOps, final Table... tables) { return count; } + static long doGets(int maxOps, byte[] family, byte[] qualifier, final Table... tables) { + int count = 0; + try { + while (count < maxOps) { + Get get = new Get(Bytes.toBytes("row-" + count)); + get.addColumn(family, qualifier); + for (final Table table : tables) { + table.get(get); + } + count += tables.length; + } + } catch (IOException e) { + LOG.error("get failed after nRetries=" + count, e); + } + return count; + } + + static long doMultiGets(int maxOps, int batchSize, int rowCount, byte[] family, byte[] qualifier, + final Table... tables) { + int opCount = 0; + Random random = new Random(); + try { + while (opCount < maxOps) { + List gets = new ArrayList<>(batchSize); + while (gets.size() < batchSize) { + Get get = new Get(Bytes.toBytes("row-" + random.nextInt(rowCount))); + get.addColumn(family, qualifier); + gets.add(get); + } + for (final Table table : tables) { + table.get(gets); + } + opCount += tables.length; + } + } catch (IOException e) { + LOG.error("multiget failed after nRetries=" + opCount, e); + } + return opCount; + } + + static long doScans(int maxOps, Table table) { + int count = 0; + int caching = 100; + try { + Scan scan = new Scan(); + scan.setCaching(caching); + scan.setCacheBlocks(false); + ResultScanner scanner = table.getScanner(scan); + while (count < (maxOps * caching)) { + scanner.next(); + count += 1; + } + } catch (IOException e) { + LOG.error("scan failed after nRetries=" + count, e); + } + return count / caching; + } + static void triggerUserCacheRefresh(HBaseTestingUtil testUtil, boolean bypass, TableName... tables) throws Exception { triggerCacheRefresh(testUtil, bypass, true, false, false, false, false, tables); From 31a7882fa23c60640f911f1a697c2b27ed6c75ec Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Fri, 26 Jan 2024 08:04:43 -0500 Subject: [PATCH 2/7] set minBlockSizeBytes in constructor --- .../org/apache/hadoop/hbase/regionserver/HRegion.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 643baeb68150..ae4045b1216b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -458,7 +458,7 @@ public MetricsTableRequests getMetricsTableRequests() { private final CellComparator cellComparator; - private int minBlockSizeBytes = -1; + private final int minBlockSizeBytes; /** * @return The smallest mvcc readPoint across all the scanners in this region. Writes older than @@ -918,6 +918,9 @@ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration co .remove(getRegionInfo().getEncodedName()); } } + + minBlockSizeBytes = Arrays.stream(this.htableDescriptor.getColumnFamilies()) + .mapToInt(ColumnFamilyDescriptor::getBlocksize).min().orElse(HConstants.DEFAULT_BLOCKSIZE); } private void setHTableSpecificConf() { @@ -2051,11 +2054,6 @@ public Configuration getReadOnlyConfiguration() { @Override public int getMinBlockSizeBytes() { - if (minBlockSizeBytes > 0) { - return minBlockSizeBytes; - } - minBlockSizeBytes = Arrays.stream(this.htableDescriptor.getColumnFamilies()) - .mapToInt(ColumnFamilyDescriptor::getBlocksize).min().orElse(HConstants.DEFAULT_BLOCKSIZE); return minBlockSizeBytes; } From e7c8a0558b0f9e42a04e220bb79fb002c8c9abce Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Fri, 26 Jan 2024 08:08:24 -0500 Subject: [PATCH 3/7] fix warnings --- .../org/apache/hadoop/hbase/regionserver/RSRpcServices.java | 4 +++- .../hadoop/hbase/quotas/TestBlockBytesScannedQuota.java | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 7eacfd73398a..177179170cd5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -3653,7 +3653,9 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque } quota.addScanResult(results); - quota.addBlockBytesScanned(rpcCall.getBlockBytesScanned()); + if (rpcCall != null) { + quota.addBlockBytesScanned(rpcCall.getBlockBytesScanned()); + } addResults(builder, results, (HBaseRpcController) controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), isClientCellBlockSupport(rpcCall)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java index 4f47fb6918c2..a8af6b9a1602 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java @@ -195,9 +195,9 @@ public void testBBSMultiGet() throws Exception { assertEquals(100, doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table)); assertEquals(100, doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table)); - // Add 100 block/min limit + // Add ~100 block/min limit admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, - Math.round(100 * blockSize), TimeUnit.MINUTES)); + Math.round(100.1 * blockSize), TimeUnit.MINUTES)); triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); // should execute approximately 10 batches of 10 requests From 433a6999285c9699934f3ebaa70fb25aacefdd44 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Sun, 28 Jan 2024 13:20:31 -0500 Subject: [PATCH 4/7] test cleanup --- .../hadoop/hbase/quotas/TestBlockBytesScannedQuota.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java index a8af6b9a1602..18249da18bf7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java @@ -150,7 +150,7 @@ public void testBBSScan() throws Exception { assertEquals(100, doScans(100, table)); assertEquals(100, doScans(100, table)); - // Add ~3 block/min limit. This should support >0 scans + // Add ~3 block/min limit. This should support >1 scans admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, Math.round(3.1 * blockSize), TimeUnit.MINUTES)); triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); @@ -180,7 +180,7 @@ public void testBBSMultiGet() throws Exception { TEST_UTIL.flush(TABLE_NAME); // Add 1 block/min limit. - // This should only allow 1 multiget per minute, because we estimate 1 block per scan + // This should only allow 1 multiget per minute, because we estimate 1 block per multiget admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize, TimeUnit.MINUTES)); triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); From 7f660d47e40db35ddb9dde17f3dd3fcdf7fda41f Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Fri, 2 Feb 2024 17:21:47 -0500 Subject: [PATCH 5/7] Simplify blockBytesScanned derivation. Use max of BBS and result size --- .../hbase/quotas/DefaultOperationQuota.java | 16 +++++++--------- .../hadoop/hbase/quotas/NoopOperationQuota.java | 5 ----- .../hadoop/hbase/quotas/OperationQuota.java | 7 ------- .../hadoop/hbase/regionserver/RSRpcServices.java | 11 ----------- 4 files changed, 7 insertions(+), 32 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index f8a877579c79..8dcba809585f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -49,7 +51,6 @@ public class DefaultOperationQuota implements OperationQuota { protected long readDiff = 0; protected long writeCapacityUnitDiff = 0; protected long readCapacityUnitDiff = 0; - private long blockBytesScanned = 0; private boolean useBlockBytesScanned; private long blockSizeBytes; @@ -102,11 +103,13 @@ public void close() { // Adjust the quota consumed for the specified operation writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed; + long resultSize = + operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()]; if (useBlockBytesScanned) { - readDiff = blockBytesScanned - readConsumed; + long blockBytesScanned = + RpcServer.getCurrentCall().map(RpcCall::getBlockBytesScanned).orElse(0L); + readDiff = Math.max(blockBytesScanned, resultSize) - readConsumed; } else { - long resultSize = - operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()]; readDiff = resultSize - readConsumed; } @@ -146,11 +149,6 @@ public void addMutation(final Mutation mutation) { operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation); } - @Override - public void addBlockBytesScanned(long blockBytesScanned) { - this.blockBytesScanned += blockBytesScanned; - } - /** * Update estimate quota(read/write size/capacityUnits) which will be consumed * @param numWrites the number of write requests diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java index 178d459a1e8f..71fc169d671f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java @@ -64,11 +64,6 @@ public void addMutation(final Mutation mutation) { // no-op } - @Override - public void addBlockBytesScanned(long blockBytesScanned) { - // no-op - } - @Override public long getReadAvailable() { return Long.MAX_VALUE; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java index e67e48e73386..bcef0f9fedfb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java @@ -70,13 +70,6 @@ public enum OperationType { */ void addMutation(Mutation mutation); - /** - * Add the block bytes scanned for the given call. This may be used to calculate the exact quota, - * and can be a better representation of workload than result sizes. Set - * {@link #USE_BLOCK_BYTES_SCANNED_KEY} to true to prefer this metric over result size. - */ - void addBlockBytesScanned(long blockBytesScanned); - /** Returns the number of bytes available to read to avoid exceeding the quota */ long getReadAvailable(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 177179170cd5..0fe6f6476a6c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -690,7 +690,6 @@ private Result increment(final HRegion region, final OperationQuota quota, if (metricsRegionServer != null) { long blockBytesScanned = context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; - quota.addBlockBytesScanned(blockBytesScanned); metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before, blockBytesScanned); } @@ -2507,9 +2506,6 @@ public GetResponse get(final RpcController controller, final GetRequest request) if (r != null && r.rawCells() != null) { quota.addGetResult(r); } - if (context != null) { - quota.addBlockBytesScanned(context.getBlockBytesScanned()); - } return builder.build(); } catch (IOException ie) { throw new ServiceException(ie); @@ -2845,9 +2841,6 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request) spaceQuotaEnforcement); } } finally { - if (context != null) { - quota.addBlockBytesScanned(context.getBlockBytesScanned()); - } quota.close(); } @@ -3048,7 +3041,6 @@ private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota long after = EnvironmentEdgeManager.currentTime(); long blockBytesScanned = context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; - quota.addBlockBytesScanned(blockBytesScanned); metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned); MutationType type = mutation.getMutateType(); @@ -3653,9 +3645,6 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque } quota.addScanResult(results); - if (rpcCall != null) { - quota.addBlockBytesScanned(rpcCall.getBlockBytesScanned()); - } addResults(builder, results, (HBaseRpcController) controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), isClientCellBlockSupport(rpcCall)); From c36ab499381b980deb202781fbd871afefc3f692 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Fri, 2 Feb 2024 21:07:23 -0500 Subject: [PATCH 6/7] nit: remove duplicated write estimate logic --- .../org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index 8dcba809585f..4f8381a6810d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -156,13 +156,13 @@ public void addMutation(final Mutation mutation) { * @param numScans the number of scan requests */ protected void updateEstimateConsumeQuota(int numWrites, int numReads, int numScans) { + writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); + if (useBlockBytesScanned) { - writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); // assume 1 block required for reads. this is probably a low estimate, which is okay readConsumed = numReads > 0 ? blockSizeBytes : 0; readConsumed += numScans > 0 ? blockSizeBytes : 0; } else { - writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); readConsumed = estimateConsume(OperationType.GET, numReads, 100); readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000); } From d1671960f8ec9414a42931348c390f44909e5cb3 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Tue, 6 Feb 2024 09:55:35 -0500 Subject: [PATCH 7/7] Use BBS as default. Makes tests a bit more defensive against flakiness --- .../hbase/quotas/DefaultOperationQuota.java | 20 +++--- .../hadoop/hbase/quotas/OperationQuota.java | 11 ++- .../quotas/TestBlockBytesScannedQuota.java | 69 +++++++++++-------- 3 files changed, 60 insertions(+), 40 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index 4f8381a6810d..4b89e18a8021 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -51,14 +51,14 @@ public class DefaultOperationQuota implements OperationQuota { protected long readDiff = 0; protected long writeCapacityUnitDiff = 0; protected long readCapacityUnitDiff = 0; - private boolean useBlockBytesScanned; + private boolean useResultSizeBytes; private long blockSizeBytes; public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes, final QuotaLimiter... limiters) { this(conf, Arrays.asList(limiters)); - this.useBlockBytesScanned = - conf.getBoolean(OperationQuota.USE_BLOCK_BYTES_SCANNED_KEY, USE_BLOCK_BYTES_SCANNED_DEFAULT); + this.useResultSizeBytes = + conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT); this.blockSizeBytes = blockSizeBytes; } @@ -105,12 +105,12 @@ public void close() { long resultSize = operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()]; - if (useBlockBytesScanned) { + if (useResultSizeBytes) { + readDiff = resultSize - readConsumed; + } else { long blockBytesScanned = RpcServer.getCurrentCall().map(RpcCall::getBlockBytesScanned).orElse(0L); readDiff = Math.max(blockBytesScanned, resultSize) - readConsumed; - } else { - readDiff = resultSize - readConsumed; } writeCapacityUnitDiff = @@ -158,13 +158,13 @@ public void addMutation(final Mutation mutation) { protected void updateEstimateConsumeQuota(int numWrites, int numReads, int numScans) { writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); - if (useBlockBytesScanned) { + if (useResultSizeBytes) { + readConsumed = estimateConsume(OperationType.GET, numReads, 100); + readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000); + } else { // assume 1 block required for reads. this is probably a low estimate, which is okay readConsumed = numReads > 0 ? blockSizeBytes : 0; readConsumed += numScans > 0 ? blockSizeBytes : 0; - } else { - readConsumed = estimateConsume(OperationType.GET, numReads, 100); - readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000); } writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java index bcef0f9fedfb..e18d3eb34953 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java @@ -35,8 +35,15 @@ public enum OperationType { SCAN } - String USE_BLOCK_BYTES_SCANNED_KEY = "hbase.quota.use.block.bytes.scanned"; - boolean USE_BLOCK_BYTES_SCANNED_DEFAULT = false; + /** + * If false, the default, then IO based throttles will consume read availability based on the + * block bytes scanned by the given request. If true then IO based throttles will use result size + * rather than block bytes scanned. Using block bytes scanned should be preferable to using result + * size, because otherwise access patterns like heavily filtered scans may be able to produce a + * significant and effectively un-throttled workload. + */ + String USE_RESULT_SIZE_BYTES = "hbase.quota.use.result.size.bytes"; + boolean USE_RESULT_SIZE_BYTES_DEFAULT = false; /** * Checks if it is possible to execute the specified operation. The quota will be estimated based diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java index 18249da18bf7..e27ba123381c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java @@ -23,9 +23,8 @@ import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doScans; import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh; import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; @@ -71,7 +70,6 @@ public static void setUpBeforeClass() throws Exception { // quotas enabled, using block bytes scanned TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME); - TEST_UTIL.getConfiguration().setBoolean(OperationQuota.USE_BLOCK_BYTES_SCANNED_KEY, true); // don't cache blocks to make IO predictable TEST_UTIL.getConfiguration().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); @@ -106,22 +104,22 @@ public void testBBSGet() throws Exception { TEST_UTIL.flush(TABLE_NAME); // Add ~10 block/min limit - admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, + admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_SIZE, Math.round(10.1 * blockSize), TimeUnit.MINUTES)); triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); // should execute at max 10 requests - assertEquals(10, doGets(20, FAMILY, QUALIFIER, table)); + testTraffic(() -> doGets(20, FAMILY, QUALIFIER, table), 10, 1); // wait a minute and you should get another 10 requests executed waitMinuteQuota(); - assertEquals(10, doGets(20, FAMILY, QUALIFIER, table)); + testTraffic(() -> doGets(20, FAMILY, QUALIFIER, table), 10, 1); // Remove all the limits admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); - assertEquals(100, doGets(100, FAMILY, QUALIFIER, table)); - assertEquals(100, doGets(100, FAMILY, QUALIFIER, table)); + testTraffic(() -> doGets(100, FAMILY, QUALIFIER, table), 100, 0); + testTraffic(() -> doGets(100, FAMILY, QUALIFIER, table), 100, 0); } @Test @@ -142,13 +140,13 @@ public void testBBSScan() throws Exception { waitMinuteQuota(); // should execute 1 request - assertEquals(1, doScans(5, table)); + testTraffic(() -> doScans(5, table), 1, 0); // Remove all the limits admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); - assertEquals(100, doScans(100, table)); - assertEquals(100, doScans(100, table)); + testTraffic(() -> doScans(100, table), 100, 0); + testTraffic(() -> doScans(100, table), 100, 0); // Add ~3 block/min limit. This should support >1 scans admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, @@ -156,16 +154,13 @@ public void testBBSScan() throws Exception { triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); // should execute some requests, but not all - long successfulScans = doScans(100, table); - LOG.info("successfulScans = " + successfulScans); - assertTrue(successfulScans < 100); - assertTrue(successfulScans > 0); + testTraffic(() -> doScans(100, table), 100, 90); // Remove all the limits admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); - assertEquals(100, doScans(100, table)); - assertEquals(100, doScans(100, table)); + testTraffic(() -> doScans(100, table), 100, 0); + testTraffic(() -> doScans(100, table), 100, 0); } @Test @@ -187,13 +182,13 @@ public void testBBSMultiGet() throws Exception { waitMinuteQuota(); // should execute 1 request - assertEquals(1, doMultiGets(10, 10, rowCount, FAMILY, QUALIFIER, table)); + testTraffic(() -> doMultiGets(10, 10, rowCount, FAMILY, QUALIFIER, table), 1, 1); // Remove all the limits admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); - assertEquals(100, doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table)); - assertEquals(100, doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table)); + testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0); + testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0); // Add ~100 block/min limit admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, @@ -201,20 +196,38 @@ public void testBBSMultiGet() throws Exception { triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); // should execute approximately 10 batches of 10 requests - long successfulMultiGets = doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table); - assertTrue(successfulMultiGets >= 9); - assertTrue(successfulMultiGets <= 11); + testTraffic(() -> doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table), 10, 1); // wait a minute and you should get another ~10 batches of 10 requests waitMinuteQuota(); - successfulMultiGets = doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table); - assertTrue(successfulMultiGets >= 9); - assertTrue(successfulMultiGets <= 11); + testTraffic(() -> doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table), 10, 1); // Remove all the limits admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); - assertEquals(100, doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table)); - assertEquals(100, doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table)); + testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0); + testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0); + } + + private void testTraffic(Callable trafficCallable, long expectedSuccess, long marginOfError) + throws Exception { + TEST_UTIL.waitFor(90_000, () -> { + long actualSuccess; + try { + actualSuccess = trafficCallable.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + LOG.info("Traffic test yielded {} successful requests. Expected {} +/- {}", actualSuccess, + expectedSuccess, marginOfError); + boolean success = (actualSuccess >= expectedSuccess - marginOfError) + && (actualSuccess <= expectedSuccess + marginOfError); + if (!success) { + triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); + waitMinuteQuota(); + Thread.sleep(15_000L); + } + return success; + }); } }