Skip to content

Commit

Permalink
HubSpot Backport: HBASE-27687 Enhance quotas to consume blockBytesSca…
Browse files Browse the repository at this point in the history
…nned rather than response size (apache#5654)

Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
  • Loading branch information
rmdmattingly authored and bbeaudreault committed Feb 6, 2024
1 parent 13c8ba0 commit a4159d3
Show file tree
Hide file tree
Showing 8 changed files with 366 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

Expand Down Expand Up @@ -49,9 +51,15 @@ public class DefaultOperationQuota implements OperationQuota {
protected long readDiff = 0;
protected long writeCapacityUnitDiff = 0;
protected long readCapacityUnitDiff = 0;
private boolean useResultSizeBytes;
private long blockSizeBytes;

public DefaultOperationQuota(final Configuration conf, final QuotaLimiter... limiters) {
public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes,
final QuotaLimiter... limiters) {
this(conf, Arrays.asList(limiters));
this.useResultSizeBytes =
conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT);
this.blockSizeBytes = blockSizeBytes;
}

/**
Expand Down Expand Up @@ -94,8 +102,17 @@ public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThro
public void close() {
// Adjust the quota consumed for the specified operation
writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
readDiff = operationSize[OperationType.GET.ordinal()]
+ operationSize[OperationType.SCAN.ordinal()] - readConsumed;

long resultSize =
operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()];
if (useResultSizeBytes) {
readDiff = resultSize - readConsumed;
} else {
long blockBytesScanned =
RpcServer.getCurrentCall().map(RpcCall::getBlockBytesScanned).orElse(0L);
readDiff = Math.max(blockBytesScanned, resultSize) - readConsumed;
}

writeCapacityUnitDiff =
calculateWriteCapacityUnitDiff(operationSize[OperationType.MUTATE.ordinal()], writeConsumed);
readCapacityUnitDiff = calculateReadCapacityUnitDiff(
Expand Down Expand Up @@ -140,8 +157,15 @@ public void addMutation(final Mutation mutation) {
*/
protected void updateEstimateConsumeQuota(int numWrites, int numReads, int numScans) {
writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
readConsumed = estimateConsume(OperationType.GET, numReads, 100);
readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);

if (useResultSizeBytes) {
readConsumed = estimateConsume(OperationType.GET, numReads, 100);
readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
} else {
// assume 1 block required for reads. this is probably a low estimate, which is okay
readConsumed = numReads > 0 ? blockSizeBytes : 0;
readConsumed += numScans > 0 ? blockSizeBytes : 0;
}

writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ public class ExceedOperationQuota extends DefaultOperationQuota {
private static final Logger LOG = LoggerFactory.getLogger(ExceedOperationQuota.class);
private QuotaLimiter regionServerLimiter;

public ExceedOperationQuota(final Configuration conf, QuotaLimiter regionServerLimiter,
final QuotaLimiter... limiters) {
super(conf, limiters);
public ExceedOperationQuota(final Configuration conf, int blockSizeBytes,
QuotaLimiter regionServerLimiter, final QuotaLimiter... limiters) {
super(conf, blockSizeBytes, limiters);
this.regionServerLimiter = regionServerLimiter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ public enum OperationType {
SCAN
}

/**
* If false, the default, then IO based throttles will consume read availability based on the
* block bytes scanned by the given request. If true then IO based throttles will use result size
* rather than block bytes scanned. Using block bytes scanned should be preferable to using result
* size, because otherwise access patterns like heavily filtered scans may be able to produce a
* significant and effectively un-throttled workload.
*/
String USE_RESULT_SIZE_BYTES = "hbase.quota.use.result.size.bytes";
boolean USE_RESULT_SIZE_BYTES_DEFAULT = false;

/**
* Checks if it is possible to execute the specified operation. The quota will be estimated based
* on the number of operations to perform and the average size accumulated during time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.Region;
Expand Down Expand Up @@ -113,7 +114,8 @@ QuotaCache getQuotaCache() {
* @param table the table where the operation will be executed
* @return the OperationQuota
*/
public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) {
public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table,
final int blockSizeBytes) {
if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) {
UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
Expand All @@ -123,7 +125,8 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t
LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
}
if (!useNoop) {
return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter);
return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
userLimiter);
}
} else {
QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
Expand All @@ -139,11 +142,11 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t
}
if (!useNoop) {
if (exceedThrottleQuotaEnabled) {
return new ExceedOperationQuota(this.rsServices.getConfiguration(), rsLimiter,
userLimiter, tableLimiter, nsLimiter);
return new ExceedOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
rsLimiter, userLimiter, tableLimiter, nsLimiter);
} else {
return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter,
tableLimiter, nsLimiter, rsLimiter);
return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
userLimiter, tableLimiter, nsLimiter, rsLimiter);
}
}
}
Expand Down Expand Up @@ -213,9 +216,10 @@ private OperationQuota checkQuota(final Region region, final int numWrites, fina
} else {
ugi = User.getCurrent().getUGI();
}
TableName table = region.getTableDescriptor().getTableName();
TableDescriptor tableDescriptor = region.getTableDescriptor();
TableName table = tableDescriptor.getTableName();

OperationQuota quota = getQuota(ugi, table);
OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes());
try {
quota.checkQuota(numWrites, numReads, numScans);
} catch (RpcThrottlingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@ public MetricsTableRequests getMetricsTableRequests() {

private final CellComparator cellComparator;

private final int minBlockSizeBytes;

/**
* @return The smallest mvcc readPoint across all the scanners in this region. Writes older than
* this readPoint, are included in every read operation.
Expand Down Expand Up @@ -900,6 +902,9 @@ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration co
.remove(getRegionInfo().getEncodedName());
}
}

minBlockSizeBytes = Arrays.stream(this.htableDescriptor.getColumnFamilies())
.mapToInt(ColumnFamilyDescriptor::getBlocksize).min().orElse(HConstants.DEFAULT_BLOCKSIZE);
}

private void setHTableSpecificConf() {
Expand Down Expand Up @@ -1941,6 +1946,11 @@ public Configuration getReadOnlyConfiguration() {
return new ReadOnlyConfiguration(this.conf);
}

@Override
public int getMinBlockSizeBytes() {
return minBlockSizeBytes;
}

private ThreadPoolExecutor getStoreOpenAndCloseThreadPool(final String threadNamePrefix) {
int numStores = Math.max(1, this.htableDescriptor.getColumnFamilyCount());
int maxThreads = Math.min(numStores, conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,4 +611,10 @@ void requestCompaction(byte[] family, String why, int priority, boolean major,
* if you try to set a configuration.
*/
Configuration getReadOnlyConfiguration();

/**
* The minimum block size configuration from all relevant column families. This is used when
* estimating quota consumption.
*/
int getMinBlockSizeBytes();
}
Loading

0 comments on commit a4159d3

Please sign in to comment.