Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27687: support consumption of block bytes scanned in operation quota #5654

Merged
merged 7 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

Expand Down Expand Up @@ -49,9 +51,15 @@ public class DefaultOperationQuota implements OperationQuota {
protected long readDiff = 0;
protected long writeCapacityUnitDiff = 0;
protected long readCapacityUnitDiff = 0;
private boolean useResultSizeBytes;
private long blockSizeBytes;

public DefaultOperationQuota(final Configuration conf, final QuotaLimiter... limiters) {
public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes,
final QuotaLimiter... limiters) {
this(conf, Arrays.asList(limiters));
this.useResultSizeBytes =
conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT);
this.blockSizeBytes = blockSizeBytes;
}

/**
Expand Down Expand Up @@ -94,8 +102,17 @@ public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThro
public void close() {
// Adjust the quota consumed for the specified operation
writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
readDiff = operationSize[OperationType.GET.ordinal()]
+ operationSize[OperationType.SCAN.ordinal()] - readConsumed;

long resultSize =
operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()];
if (useResultSizeBytes) {
readDiff = resultSize - readConsumed;
} else {
long blockBytesScanned =
RpcServer.getCurrentCall().map(RpcCall::getBlockBytesScanned).orElse(0L);
readDiff = Math.max(blockBytesScanned, resultSize) - readConsumed;
}

writeCapacityUnitDiff =
calculateWriteCapacityUnitDiff(operationSize[OperationType.MUTATE.ordinal()], writeConsumed);
readCapacityUnitDiff = calculateReadCapacityUnitDiff(
Expand Down Expand Up @@ -140,8 +157,15 @@ public void addMutation(final Mutation mutation) {
*/
protected void updateEstimateConsumeQuota(int numWrites, int numReads, int numScans) {
writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
readConsumed = estimateConsume(OperationType.GET, numReads, 100);
readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);

if (useResultSizeBytes) {
readConsumed = estimateConsume(OperationType.GET, numReads, 100);
readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
} else {
// assume 1 block required for reads. this is probably a low estimate, which is okay
readConsumed = numReads > 0 ? blockSizeBytes : 0;
readConsumed += numScans > 0 ? blockSizeBytes : 0;
bbeaudreault marked this conversation as resolved.
Show resolved Hide resolved
}

writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ public class ExceedOperationQuota extends DefaultOperationQuota {
private static final Logger LOG = LoggerFactory.getLogger(ExceedOperationQuota.class);
private QuotaLimiter regionServerLimiter;

public ExceedOperationQuota(final Configuration conf, QuotaLimiter regionServerLimiter,
final QuotaLimiter... limiters) {
super(conf, limiters);
public ExceedOperationQuota(final Configuration conf, int blockSizeBytes,
QuotaLimiter regionServerLimiter, final QuotaLimiter... limiters) {
super(conf, blockSizeBytes, limiters);
this.regionServerLimiter = regionServerLimiter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ public enum OperationType {
SCAN
}

/**
* If false, the default, then IO based throttles will consume read availability based on the
* block bytes scanned by the given request. If true then IO based throttles will use result size
* rather than block bytes scanned. Using block bytes scanned should be preferable to using result
* size, because otherwise access patterns like heavily filtered scans may be able to produce a
* significant and effectively un-throttled workload.
*/
String USE_RESULT_SIZE_BYTES = "hbase.quota.use.result.size.bytes";
boolean USE_RESULT_SIZE_BYTES_DEFAULT = false;

/**
* Checks if it is possible to execute the specified operation. The quota will be estimated based
* on the number of operations to perform and the average size accumulated during time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.Region;
Expand Down Expand Up @@ -113,7 +114,8 @@ QuotaCache getQuotaCache() {
* @param table the table where the operation will be executed
* @return the OperationQuota
*/
public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) {
public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table,
final int blockSizeBytes) {
if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) {
UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
Expand All @@ -123,7 +125,8 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t
LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
}
if (!useNoop) {
return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter);
return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
userLimiter);
}
} else {
QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
Expand All @@ -139,11 +142,11 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t
}
if (!useNoop) {
if (exceedThrottleQuotaEnabled) {
return new ExceedOperationQuota(this.rsServices.getConfiguration(), rsLimiter,
userLimiter, tableLimiter, nsLimiter);
return new ExceedOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
rsLimiter, userLimiter, tableLimiter, nsLimiter);
} else {
return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter,
tableLimiter, nsLimiter, rsLimiter);
return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
userLimiter, tableLimiter, nsLimiter, rsLimiter);
}
}
}
Expand Down Expand Up @@ -213,9 +216,10 @@ private OperationQuota checkQuota(final Region region, final int numWrites, fina
} else {
ugi = User.getCurrent().getUGI();
}
TableName table = region.getTableDescriptor().getTableName();
TableDescriptor tableDescriptor = region.getTableDescriptor();
TableName table = tableDescriptor.getTableName();

OperationQuota quota = getQuota(ugi, table);
OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes());
try {
quota.checkQuota(numWrites, numReads, numScans);
} catch (RpcThrottlingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ public MetricsTableRequests getMetricsTableRequests() {

private final CellComparator cellComparator;

private final int minBlockSizeBytes;

/**
* @return The smallest mvcc readPoint across all the scanners in this region. Writes older than
* this readPoint, are included in every read operation.
Expand Down Expand Up @@ -916,6 +918,9 @@ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration co
.remove(getRegionInfo().getEncodedName());
}
}

minBlockSizeBytes = Arrays.stream(this.htableDescriptor.getColumnFamilies())
.mapToInt(ColumnFamilyDescriptor::getBlocksize).min().orElse(HConstants.DEFAULT_BLOCKSIZE);
}

private void setHTableSpecificConf() {
Expand Down Expand Up @@ -2047,6 +2052,11 @@ public Configuration getReadOnlyConfiguration() {
return new ReadOnlyConfiguration(this.conf);
}

@Override
public int getMinBlockSizeBytes() {
return minBlockSizeBytes;
}

private ThreadPoolExecutor getStoreOpenAndCloseThreadPool(final String threadNamePrefix) {
int numStores = Math.max(1, this.htableDescriptor.getColumnFamilyCount());
int maxThreads = Math.min(numStores, conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,4 +571,10 @@ void requestCompaction(byte[] family, String why, int priority, boolean major,
* if you try to set a configuration.
*/
Configuration getReadOnlyConfiguration();

/**
* The minimum block size configuration from all relevant column families. This is used when
* estimating quota consumption.
*/
int getMinBlockSizeBytes();
}
Loading