Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28107 Limit max count of rows filtered per scan request. #5428

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,8 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>

private final RegionServerAccounting regionServerAccounting;

private final RegionScannerLimiter regionScannerLimiter;

private NamedQueueServiceChore namedQueueServiceChore = null;

// Block cache
Expand Down Expand Up @@ -521,6 +523,7 @@ public HRegionServer(final Configuration conf) throws IOException {
HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME);

regionServerAccounting = new RegionServerAccounting(conf);
regionScannerLimiter = new RegionScannerLimiter(conf);

blockCache = BlockCacheFactory.createBlockCache(conf);
mobFileCache = new MobFileCache(conf);
Expand Down Expand Up @@ -2078,6 +2081,7 @@ private void registerConfigurationObservers() {
configurationManager.registerObserver(this.cacheFlusher);
configurationManager.registerObserver(this.rpcServices);
configurationManager.registerObserver(this);
configurationManager.registerObserver(regionScannerLimiter);
}

/*
Expand Down Expand Up @@ -3637,4 +3641,8 @@ protected void stopChores() {
public RegionReplicationBufferManager getRegionReplicationBufferManager() {
return regionReplicationBufferManager;
}

public RegionScannerLimiter getRegionScannerLimiter() {
return regionScannerLimiter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,7 @@ private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Ship
s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s);
RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback,
needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName());
s.setName(scannerName);
RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! "
+ scannerName + ", " + existing;
Expand Down Expand Up @@ -3330,6 +3331,11 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
contextBuilder.setTimeLimit(timeScope, timeLimit);
contextBuilder.setTrackMetrics(trackMetrics);
ScannerContext scannerContext = contextBuilder.build();

RegionScannerLimiter regionScannerLimiter =
((HRegionServer) region.rsServices).getRegionScannerLimiter();
regionScannerLimiter.setFilterRowsLimitReached(scanner.getName(), false);

boolean limitReached = false;
while (numOfResults < maxResults) {
// Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
Expand Down Expand Up @@ -3405,7 +3411,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
boolean resultsLimitReached = numOfResults >= maxResults;
limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
boolean filterRowsLimitReached =
regionScannerLimiter.isFilterRowsLimitReached(scanner.getName());
limitReached =
sizeLimitReached || timeLimitReached || resultsLimitReached || filterRowsLimitReached;

if (limitReached || !moreRows) {
// With block size limit, we may exceed size limit without collecting any results.
Expand All @@ -3416,7 +3425,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
// there are more values to be read server side. If there aren't more values,
// marking it as a heartbeat is wasteful because the client will need to issue
// another ScanRequest only to realize that they already have all the values
if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) {
if (
moreRows
&& (timeLimitReached || sizeLimitReachedWithoutResults || filterRowsLimitReached)
) {
// Heartbeat messages occur when the time limit has been reached, or size limit has
// been reached before collecting any results. This can happen for heavily filtered
// scans which scan over too many blocks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,12 @@ default String getOperationId() {
* @throws IOException e
*/
boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException;

default void setName(String name) {

}

default String getName() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
Expand Down Expand Up @@ -90,6 +91,7 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
private final ScannerContext defaultScannerContext;
private final FilterWrapper filter;
private final String operationId;
private String name;

private RegionServerServices rsServices;

Expand Down Expand Up @@ -487,7 +489,10 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
// Check if rowkey filter wants to exclude this row. If so, loop to next.
// Technically, if we hit limits before on this row, we don't need this call.
if (filterRowKey(current)) {
incrementCountOfRowsFilteredMetric(scannerContext);
if (incrementCountOfRowsFilteredMetric(scannerContext)) {
return scannerContext.setScannerState(NextState.FILTERED_ROWS_LIMIT_REACHED)
.hasMoreValues();
}
// early check, see HBASE-16296
if (isFilterDoneInternal()) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
Expand Down Expand Up @@ -552,7 +557,10 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
}

if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
incrementCountOfRowsFilteredMetric(scannerContext);
if (incrementCountOfRowsFilteredMetric(scannerContext)) {
return scannerContext.setScannerState(NextState.FILTERED_ROWS_LIMIT_REACHED)
.hasMoreValues();
}
results.clear();
boolean moreRows = nextRow(scannerContext, current);
if (!moreRows) {
Expand Down Expand Up @@ -604,7 +612,10 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
// Double check to prevent empty rows from appearing in result. It could be
// the case when SingleColumnValueExcludeFilter is used.
if (results.isEmpty()) {
incrementCountOfRowsFilteredMetric(scannerContext);
if (incrementCountOfRowsFilteredMetric(scannerContext)) {
return scannerContext.setScannerState(NextState.FILTERED_ROWS_LIMIT_REACHED)
.hasMoreValues();
}
boolean moreRows = nextRow(scannerContext, current);
if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
Expand All @@ -629,17 +640,42 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
}
}

private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
private boolean incrementCountOfRowsFilteredMetric(ScannerContext scannerContext)
throws DoNotRetryIOException {
region.filteredReadRequestsCount.increment();
if (region.getMetrics() != null) {
region.getMetrics().updateFilteredRecords();
}

if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
return;
if (scannerContext == null) {
return false;
}

scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
if (scannerContext.isTrackingMetrics()) {
scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
}

scannerContext.incrementFilteredRowsProgress(1);
long countOfRowsFiltered = scannerContext.getFilterRowsProgress();
if (region.rsServices instanceof HRegionServer) {
RegionScannerLimiter regionScannerLimiter =
((HRegionServer) region.rsServices).getRegionScannerLimiter();
long maxRowsFilteredPerRequest = regionScannerLimiter.getMaxRowsFilteredPerRequest();
if (maxRowsFilteredPerRequest > 0 && countOfRowsFiltered >= maxRowsFilteredPerRequest) {
regionScannerLimiter.setFilterRowsLimitReached(getName(), true);
if (regionScannerLimiter.killRequest()) {
String errMsg =
String.format("Too many rows filtered, higher than the limit threshold of %s, "
+ "so kill the scan request!", maxRowsFilteredPerRequest);
LOG.warn("ScannerContext={}, errMsg={}", scannerContext, errMsg);
throw new DoNotRetryIOException(errMsg);
} else {
return true;
}
}
}

return false;
}

private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
Expand Down Expand Up @@ -750,6 +786,9 @@ protected boolean shouldStop(Cell currentRowCell) {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
justification = "this method is only called inside close which is synchronized")
private void closeInternal() {
if (region.rsServices instanceof HRegionServer) {
((HRegionServer) region.rsServices).getRegionScannerLimiter().removeScanner(getName());
}
if (storeHeap != null) {
storeHeap.close();
storeHeap = null;
Expand Down Expand Up @@ -806,4 +845,14 @@ public void run() throws IOException {
// callback
this.close();
}

@Override
public void setName(String name) {
this.name = name;
}

@Override
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Limit max count of rows filtered per scan request. This Limiter applies globally to scan
* requests, and the config key is
* {@link RegionScannerLimiter#HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY}. When heavily
* filtered scan requests frequently cause high load on the RegionServer, you can set the
* {@link RegionScannerLimiter#HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY} to a larger
* value (e.g. 100,000) to limit those scan requests. If you want to kill the scan request at the
* same time, you can set
* {@link RegionScannerLimiter#HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_REACHED_REQUEST_KILLED_KEY} to
* true. If you want to disable this feature, just set the
* {@link RegionScannerLimiter#HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY} to 0.
*/
@InterfaceAudience.Private
public class RegionScannerLimiter implements ConfigurationObserver {

private static final Logger LOG = LoggerFactory.getLogger(RegionScannerLimiter.class);

public static final String HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY =
"hbase.server.scanner.max.rows.filtered.per.request";

public static final String HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_REACHED_REQUEST_KILLED_KEY =
"hbase.server.scanner.max.rows.filtered.reached.request.killed";

// Max count of rows filtered per scan request. If equals zero, it means no limitation.
// Note: No limitation by default.
private volatile long maxRowsFilteredPerRequest = 0;
// Killing scan request when TRUE.
private volatile boolean requestKilled = false;

private final ConcurrentMap<String, Boolean> scanners = new ConcurrentHashMap<>();

public RegionScannerLimiter(Configuration conf) {
onConfigurationChange(conf);
}

private <T> void updateLimiterConf(Configuration conf, String configKey, T oldValue,
Function<String, T> applyFunc) {
try {
if (conf.get(configKey) == null) {
return;
}
T targetValue = applyFunc.apply(configKey);
if (targetValue != null) {
LOG.info("Config key={}, old value={}, new value={}", configKey, oldValue, targetValue);
}
} catch (Exception e) {
LOG.error("Failed to update config key: {}", configKey, e);
}
}

public long getMaxRowsFilteredPerRequest() {
return this.maxRowsFilteredPerRequest;
}

public boolean isFilterRowsLimitReached(String scannerName) {
return scanners.getOrDefault(scannerName, false);
}

public void setFilterRowsLimitReached(String scannerName, boolean limitReached) {
scanners.put(scannerName, limitReached);
}

public void removeScanner(String scannerName) {
scanners.remove(scannerName);
}

public boolean killRequest() {
return requestKilled;
}

public ConcurrentMap<String, Boolean> getScanners() {
return scanners;
}

@Override
public void onConfigurationChange(Configuration conf) {
updateLimiterConf(conf, HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY,
maxRowsFilteredPerRequest, configKey -> {
long targetValue = conf.getLong(configKey, -1);
if (targetValue < 0) {
LOG.warn("Invalid parameter, should be greater than or equal to zero, target value: {}",
targetValue);
return null;
}
if (maxRowsFilteredPerRequest == targetValue) {
return null;
}
maxRowsFilteredPerRequest = targetValue;
return targetValue;
});
updateLimiterConf(conf, HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_REACHED_REQUEST_KILLED_KEY,
requestKilled, configKey -> {
boolean targetValue = conf.getBoolean(configKey, false);
if (targetValue == requestKilled) {
return null;
}
requestKilled = targetValue;
return targetValue;
});
}
}
Loading