diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index ddf804243ed8..4b89e18a8021 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -49,9 +51,15 @@ public class DefaultOperationQuota implements OperationQuota { protected long readDiff = 0; protected long writeCapacityUnitDiff = 0; protected long readCapacityUnitDiff = 0; + private boolean useResultSizeBytes; + private long blockSizeBytes; - public DefaultOperationQuota(final Configuration conf, final QuotaLimiter... limiters) { + public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes, + final QuotaLimiter... limiters) { this(conf, Arrays.asList(limiters)); + this.useResultSizeBytes = + conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT); + this.blockSizeBytes = blockSizeBytes; } /** @@ -94,8 +102,17 @@ public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThro public void close() { // Adjust the quota consumed for the specified operation writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed; - readDiff = operationSize[OperationType.GET.ordinal()] - + operationSize[OperationType.SCAN.ordinal()] - readConsumed; + + long resultSize = + operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()]; + if (useResultSizeBytes) { + readDiff = resultSize - readConsumed; + } else { + long blockBytesScanned = + RpcServer.getCurrentCall().map(RpcCall::getBlockBytesScanned).orElse(0L); + readDiff = Math.max(blockBytesScanned, resultSize) - readConsumed; + } + writeCapacityUnitDiff = calculateWriteCapacityUnitDiff(operationSize[OperationType.MUTATE.ordinal()], writeConsumed); readCapacityUnitDiff = calculateReadCapacityUnitDiff( @@ -140,8 +157,15 @@ public void addMutation(final Mutation mutation) { */ protected void updateEstimateConsumeQuota(int numWrites, int numReads, int numScans) { writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); - readConsumed = estimateConsume(OperationType.GET, numReads, 100); - readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000); + + if (useResultSizeBytes) { + readConsumed = estimateConsume(OperationType.GET, numReads, 100); + readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000); + } else { + // assume 1 block required for reads. this is probably a low estimate, which is okay + readConsumed = numReads > 0 ? blockSizeBytes : 0; + readConsumed += numScans > 0 ? blockSizeBytes : 0; + } writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed); readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java index 1b7200f5f22f..1788e550f22a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java @@ -40,9 +40,9 @@ public class ExceedOperationQuota extends DefaultOperationQuota { private static final Logger LOG = LoggerFactory.getLogger(ExceedOperationQuota.class); private QuotaLimiter regionServerLimiter; - public ExceedOperationQuota(final Configuration conf, QuotaLimiter regionServerLimiter, - final QuotaLimiter... limiters) { - super(conf, limiters); + public ExceedOperationQuota(final Configuration conf, int blockSizeBytes, + QuotaLimiter regionServerLimiter, final QuotaLimiter... limiters) { + super(conf, blockSizeBytes, limiters); this.regionServerLimiter = regionServerLimiter; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java index aaae64b6184a..e18d3eb34953 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java @@ -35,6 +35,16 @@ public enum OperationType { SCAN } + /** + * If false, the default, then IO based throttles will consume read availability based on the + * block bytes scanned by the given request. If true then IO based throttles will use result size + * rather than block bytes scanned. Using block bytes scanned should be preferable to using result + * size, because otherwise access patterns like heavily filtered scans may be able to produce a + * significant and effectively un-throttled workload. + */ + String USE_RESULT_SIZE_BYTES = "hbase.quota.use.result.size.bytes"; + boolean USE_RESULT_SIZE_BYTES_DEFAULT = false; + /** * Checks if it is possible to execute the specified operation. The quota will be estimated based * on the number of operations to perform and the average size accumulated during time. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index 4b09c0308f9e..de76303e27ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Optional; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.ipc.RpcScheduler; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.regionserver.Region; @@ -113,7 +114,8 @@ QuotaCache getQuotaCache() { * @param table the table where the operation will be executed * @return the OperationQuota */ - public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) { + public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table, + final int blockSizeBytes) { if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) { UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi); QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table); @@ -123,7 +125,8 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter); } if (!useNoop) { - return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter); + return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes, + userLimiter); } } else { QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString()); @@ -139,11 +142,11 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t } if (!useNoop) { if (exceedThrottleQuotaEnabled) { - return new ExceedOperationQuota(this.rsServices.getConfiguration(), rsLimiter, - userLimiter, tableLimiter, nsLimiter); + return new ExceedOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes, + rsLimiter, userLimiter, tableLimiter, nsLimiter); } else { - return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter, - tableLimiter, nsLimiter, rsLimiter); + return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes, + userLimiter, tableLimiter, nsLimiter, rsLimiter); } } } @@ -213,9 +216,10 @@ private OperationQuota checkQuota(final Region region, final int numWrites, fina } else { ugi = User.getCurrent().getUGI(); } - TableName table = region.getTableDescriptor().getTableName(); + TableDescriptor tableDescriptor = region.getTableDescriptor(); + TableName table = tableDescriptor.getTableName(); - OperationQuota quota = getQuota(ugi, table); + OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes()); try { quota.checkQuota(numWrites, numReads, numScans); } catch (RpcThrottlingException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c775dbb71534..e3b1f1ea3edb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -438,6 +438,8 @@ public MetricsTableRequests getMetricsTableRequests() { private final CellComparator cellComparator; + private final int minBlockSizeBytes; + /** * @return The smallest mvcc readPoint across all the scanners in this region. Writes older than * this readPoint, are included in every read operation. @@ -900,6 +902,9 @@ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration co .remove(getRegionInfo().getEncodedName()); } } + + minBlockSizeBytes = Arrays.stream(this.htableDescriptor.getColumnFamilies()) + .mapToInt(ColumnFamilyDescriptor::getBlocksize).min().orElse(HConstants.DEFAULT_BLOCKSIZE); } private void setHTableSpecificConf() { @@ -1941,6 +1946,11 @@ public Configuration getReadOnlyConfiguration() { return new ReadOnlyConfiguration(this.conf); } + @Override + public int getMinBlockSizeBytes() { + return minBlockSizeBytes; + } + private ThreadPoolExecutor getStoreOpenAndCloseThreadPool(final String threadNamePrefix) { int numStores = Math.max(1, this.htableDescriptor.getColumnFamilyCount()); int maxThreads = Math.min(numStores, conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 11cf99b9deec..14cea2194b6d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -611,4 +611,10 @@ void requestCompaction(byte[] family, String why, int priority, boolean major, * if you try to set a configuration. */ Configuration getReadOnlyConfiguration(); + + /** + * The minimum block size configuration from all relevant column families. This is used when + * estimating quota consumption. + */ + int getMinBlockSizeBytes(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java new file mode 100644 index 000000000000..d4ea24c7af21 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doGets; +import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doMultiGets; +import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts; +import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doScans; +import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh; +import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestBlockBytesScannedQuota { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBlockBytesScannedQuota.class); + + private final static Logger LOG = LoggerFactory.getLogger(TestBlockBytesScannedQuota.class); + + private static final int REFRESH_TIME = 5000; + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte[] FAMILY = Bytes.toBytes("cf"); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + + private static final TableName TABLE_NAME = TableName.valueOf("BlockBytesScannedQuotaTest"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // client should fail fast + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 10); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + + // quotas enabled, using block bytes scanned + TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME); + + // don't cache blocks to make IO predictable + TEST_UTIL.getConfiguration().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); + + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + QuotaCache.TEST_FORCE_REFRESH = true; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + EnvironmentEdgeManager.reset(); + TEST_UTIL.deleteTable(TABLE_NAME); + TEST_UTIL.shutdownMiniCluster(); + } + + @After + public void tearDown() throws Exception { + ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL); + } + + @Test + public void testBBSGet() throws Exception { + final Admin admin = TEST_UTIL.getAdmin(); + final String userName = User.getCurrent().getShortName(); + int blockSize = admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize(); + Table table = admin.getConnection().getTable(TABLE_NAME); + + doPuts(10_000, FAMILY, QUALIFIER, table); + TEST_UTIL.flush(TABLE_NAME); + + // Add ~10 block/min limit + admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_SIZE, + Math.round(10.1 * blockSize), TimeUnit.MINUTES)); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + + // should execute at max 10 requests + testTraffic(() -> doGets(20, FAMILY, QUALIFIER, table), 10, 1); + + // wait a minute and you should get another 10 requests executed + waitMinuteQuota(); + testTraffic(() -> doGets(20, FAMILY, QUALIFIER, table), 10, 1); + + // Remove all the limits + admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); + triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); + testTraffic(() -> doGets(100, FAMILY, QUALIFIER, table), 100, 0); + testTraffic(() -> doGets(100, FAMILY, QUALIFIER, table), 100, 0); + } + + @Test + public void testBBSScan() throws Exception { + final Admin admin = TEST_UTIL.getAdmin(); + final String userName = User.getCurrent().getShortName(); + int blockSize = admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize(); + Table table = admin.getConnection().getTable(TABLE_NAME); + + doPuts(10_000, FAMILY, QUALIFIER, table); + TEST_UTIL.flush(TABLE_NAME); + + // Add 1 block/min limit. + // This should only allow 1 scan per minute, because we estimate 1 block per scan + admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize, + TimeUnit.MINUTES)); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + waitMinuteQuota(); + + // should execute 1 request + testTraffic(() -> doScans(5, table), 1, 0); + + // Remove all the limits + admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); + triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); + testTraffic(() -> doScans(100, table), 100, 0); + testTraffic(() -> doScans(100, table), 100, 0); + + // Add ~3 block/min limit. This should support >1 scans + admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, + Math.round(3.1 * blockSize), TimeUnit.MINUTES)); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + + // should execute some requests, but not all + testTraffic(() -> doScans(100, table), 100, 90); + + // Remove all the limits + admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); + triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); + testTraffic(() -> doScans(100, table), 100, 0); + testTraffic(() -> doScans(100, table), 100, 0); + } + + @Test + public void testBBSMultiGet() throws Exception { + final Admin admin = TEST_UTIL.getAdmin(); + final String userName = User.getCurrent().getShortName(); + int blockSize = admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize(); + Table table = admin.getConnection().getTable(TABLE_NAME); + int rowCount = 10_000; + + doPuts(rowCount, FAMILY, QUALIFIER, table); + TEST_UTIL.flush(TABLE_NAME); + + // Add 1 block/min limit. + // This should only allow 1 multiget per minute, because we estimate 1 block per multiget + admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize, + TimeUnit.MINUTES)); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + waitMinuteQuota(); + + // should execute 1 request + testTraffic(() -> doMultiGets(10, 10, rowCount, FAMILY, QUALIFIER, table), 1, 1); + + // Remove all the limits + admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); + triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); + testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0); + testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0); + + // Add ~100 block/min limit + admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, + Math.round(100.1 * blockSize), TimeUnit.MINUTES)); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + + // should execute approximately 10 batches of 10 requests + testTraffic(() -> doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table), 10, 1); + + // wait a minute and you should get another ~10 batches of 10 requests + waitMinuteQuota(); + testTraffic(() -> doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table), 10, 1); + + // Remove all the limits + admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); + triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); + testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0); + testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0); + } + + private void testTraffic(Callable trafficCallable, long expectedSuccess, long marginOfError) + throws Exception { + TEST_UTIL.waitFor(90_000, () -> { + long actualSuccess; + try { + actualSuccess = trafficCallable.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + LOG.info("Traffic test yielded {} successful requests. Expected {} +/- {}", actualSuccess, + expectedSuccess, marginOfError); + boolean success = (actualSuccess >= expectedSuccess - marginOfError) + && (actualSuccess <= expectedSuccess + marginOfError); + if (!success) { + triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); + waitMinuteQuota(); + Thread.sleep(15_000L); + } + return success; + }); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java index baf11df848cf..0bee1bf7c7a2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java @@ -18,12 +18,17 @@ package org.apache.hadoop.hbase.quotas; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; +import java.util.Random; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; @@ -105,6 +110,64 @@ static long doGets(int maxOps, final Table... tables) { return count; } + static long doGets(int maxOps, byte[] family, byte[] qualifier, final Table... tables) { + int count = 0; + try { + while (count < maxOps) { + Get get = new Get(Bytes.toBytes("row-" + count)); + get.addColumn(family, qualifier); + for (final Table table : tables) { + table.get(get); + } + count += tables.length; + } + } catch (IOException e) { + LOG.error("get failed after nRetries=" + count, e); + } + return count; + } + + static long doMultiGets(int maxOps, int batchSize, int rowCount, byte[] family, byte[] qualifier, + final Table... tables) { + int opCount = 0; + Random random = new Random(); + try { + while (opCount < maxOps) { + List gets = new ArrayList<>(batchSize); + while (gets.size() < batchSize) { + Get get = new Get(Bytes.toBytes("row-" + random.nextInt(rowCount))); + get.addColumn(family, qualifier); + gets.add(get); + } + for (final Table table : tables) { + table.get(gets); + } + opCount += tables.length; + } + } catch (IOException e) { + LOG.error("multiget failed after nRetries=" + opCount, e); + } + return opCount; + } + + static long doScans(int maxOps, Table table) { + int count = 0; + int caching = 100; + try { + Scan scan = new Scan(); + scan.setCaching(caching); + scan.setCacheBlocks(false); + ResultScanner scanner = table.getScanner(scan); + while (count < (maxOps * caching)) { + scanner.next(); + count += 1; + } + } catch (IOException e) { + LOG.error("scan failed after nRetries=" + count, e); + } + return count / caching; + } + static void triggerUserCacheRefresh(HBaseTestingUtility testUtil, boolean bypass, TableName... tables) throws Exception { triggerCacheRefresh(testUtil, bypass, true, false, false, false, false, tables);