Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26045 Master control the global throughput of all compaction servers #3538

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2256,6 +2256,17 @@ void cloneTableSchema(TableName tableName, TableName newTableName, boolean prese
*/
boolean isCompactionOffloadEnabled() throws IOException;

/**
* update compaction server total throughput bound
* @param upperBound the total throughput upper bound of all compaction servers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we talk of compaction servers here, are we talking about the new compaction server facility or are we talking about regionservers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can control compaction throughtput on compactionServers, can not control regionservers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most deploys will not use a compaction server suite. Is it correct then to talk of compaction servers in our Admin API? Should there be another channel for interacting with compaction servers? A CompactionServersAdmin ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method talk to master only, compaction servers get throughput control message through periodic heartbeat report

* @param lowerBound the total throughput lower bound of all compaction servers
* @param offPeak the total throughput offPeak bound of all compaction servers
* @return the now total throughput of all compaction servers
* @throws IOException if a remote or network exception occurs
*/
Map<String, Long> updateCompactionServerTotalThroughput(Long upperBound, Long lowerBound,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have 'CompactionServer' in the method name, so this global control is only for compaction server? Region server can not have the same limit? And why Long, not long?

Copy link
Contributor Author

@nyl3532016 nyl3532016 Sep 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, We do not control regionserver's compaction throughtput in this PR (it is OK?), For HRegionserver can adjust their compaction throughtput via update_config shell commend, no need restart server. They implement ConfigurationObserver interface.

Long offPeak) throws IOException;

/**
* Switch the exceed throttle quota. If enabled, user/table/namespace throttle quota
* can be exceeded if region server has availble quota.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,12 @@ public boolean switchCompactionOffload(boolean enable) throws IOException {
return get(admin.switchCompactionOffload(enable));
}

@Override
public Map<String, Long> updateCompactionServerTotalThroughput(Long upperBound, Long lowerBound,
Long offPeak) throws IOException {
return get(admin.updateCompactionServerTotalThroughput(upperBound, lowerBound, offPeak));
}

@Override
public boolean isCompactionOffloadEnabled() throws IOException {
return get(admin.isCompactionOffloadEnabled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1459,6 +1459,16 @@ CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState
*/
CompletableFuture<Boolean> switchCompactionOffload(boolean enable);

/**
* update compaction server total throughput bound
* @param upperBound the total throughput upper bound of all compaction servers
* @param lowerBound the total throughput lower bound of all compaction servers
* @param offPeak the total throughput offPeak bound of all compaction servers
* @return the now total throughput of all compaction servers
*/
CompletableFuture<Map<String, Long>> updateCompactionServerTotalThroughput(Long upperBound,
Long lowerBound, Long offPeak);

/**
* Get if the compaction offload is enabled.
* @return True if compaction offload is enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,12 @@ public CompletableFuture<Boolean> switchCompactionOffload(boolean enable) {
return wrap(rawAdmin.switchCompactionOffload(enable));
}

@Override
public CompletableFuture<Map<String, Long>> updateCompactionServerTotalThroughput(Long upperBound,
Long lowerBound, Long offPeak) {
return wrap(rawAdmin.updateCompactionServerTotalThroughput(upperBound, lowerBound, offPeak));
}

@Override
public CompletableFuture<Boolean> isCompactionOffloadEnabled() {
return wrap(rawAdmin.isCompactionOffloadEnabled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UpdateCompactionServerTotalThroughputRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UpdateCompactionServerTotalThroughputResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
Expand Down Expand Up @@ -3822,6 +3824,27 @@ public CompletableFuture<Boolean> switchCompactionOffload(boolean enable) {
return future;
}

@Override
public CompletableFuture<Map<String, Long>> updateCompactionServerTotalThroughput(Long upperBound,
Long lowerBound, Long offPeak) {
CompletableFuture<Map<String, Long>> future = this.<Map<String, Long>> newMasterCaller().action(
(controller, stub) -> this
.<UpdateCompactionServerTotalThroughputRequest, UpdateCompactionServerTotalThroughputResponse, Map<String, Long>> call(
controller, stub,
UpdateCompactionServerTotalThroughputRequest.newBuilder()
.setMaxThroughputUpperBound(upperBound).setMaxThroughputLowerBound(lowerBound)
.setMaxThroughputOffPeak(offPeak).build(),
(s, c, req, done) -> s.updateCompactionServerTotalThroughput(c, req, done), resp -> {
Map<String, Long> result = new HashMap<>();
result.put("UpperBound", resp.getMaxThroughputUpperBound());
result.put("LowerBound", resp.getMaxThroughputLowerBound());
result.put("OffPeak", resp.getMaxThroughputOffPeak());
return result;
}))
.call();
return future;
}

@Override
public CompletableFuture<Boolean> isCompactionOffloadEnabled() {
CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ message CompactionServerReportRequest {
}

message CompactionServerReportResponse {
required int64 max_throughput_upper_bound = 1 ;
required int64 max_throughput_lower_bound = 2 ;
required int64 max_throughput_off_peak = 3 ;
}

service CompactionServerStatusService {
Expand Down
15 changes: 15 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,18 @@ message SwitchCompactionOffloadResponse {
required bool previous_compaction_offload_enabled = 1;
}

message UpdateCompactionServerTotalThroughputRequest {
required int64 max_throughput_upper_bound = 1;
required int64 max_throughput_lower_bound = 2;
required int64 max_throughput_off_peak = 3;
}

message UpdateCompactionServerTotalThroughputResponse {
required int64 max_throughput_upper_bound = 1;
required int64 max_throughput_lower_bound = 2;
required int64 max_throughput_off_peak = 3;
}

service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
Expand Down Expand Up @@ -1112,6 +1124,9 @@ service MasterService {
rpc SwitchCompactionOffload (SwitchCompactionOffloadRequest)
returns (SwitchCompactionOffloadResponse);

rpc UpdateCompactionServerTotalThroughput(UpdateCompactionServerTotalThroughputRequest)
returns (UpdateCompactionServerTotalThroughputResponse);

/** Get if is compaction offload enabled */
rpc IsCompactionOffloadEnabled (IsCompactionOffloadEnabledRequest)
returns (IsCompactionOffloadEnabledResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
Expand Down Expand Up @@ -94,6 +95,10 @@ public class CompactionThreadManager implements ThroughputControllerService {
new ConcurrentHashMap<>();
private static CompactionFilesCache compactionFilesCache = new CompactionFilesCache();

public ThroughputController getCompactionThroughputController() {
return compactThreadControl.getCompactionThroughputController();
}

CompactionThreadManager(final Configuration conf, HCompactionServer server) {
this.conf = conf;
this.server = server;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController;
import org.apache.hadoop.hbase.security.SecurityConstants;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.UserProvider;
Expand Down Expand Up @@ -179,7 +181,20 @@ private boolean tryCompactionServerReport(long reportStartTime, long reportEndTi
CompactionServerStatusProtos.CompactionServerReportRequest.newBuilder();
request.setServer(ProtobufUtil.toServerName(getServerName()));
request.setLoad(sl);
this.cssStub.compactionServerReport(null, request.build());
CompactionServerStatusProtos.CompactionServerReportResponse compactionServerReportResponse =
this.cssStub.compactionServerReport(null, request.build());
ThroughputController throughputController =
compactionThreadManager.getCompactionThroughputController();
if (throughputController instanceof PressureAwareCompactionThroughputController) {
((PressureAwareCompactionThroughputController) throughputController)
.setMaxThroughputUpperBound(
compactionServerReportResponse.getMaxThroughputUpperBound());
((PressureAwareCompactionThroughputController) throughputController)
.setMaxThroughputLowerBound(
compactionServerReportResponse.getMaxThroughputLowerBound());
((PressureAwareCompactionThroughputController) throughputController)
.setMaxThroughputOffPeak(compactionServerReportResponse.getMaxThroughputOffPeak());
}
} catch (ServiceException se) {
IOException ioe = ProtobufUtil.getRemoteException(se);
if (ioe instanceof YouAreDeadException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Triple;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -343,6 +344,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UpdateCompactionServerTotalThroughputRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UpdateCompactionServerTotalThroughputResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse.NamespaceQuotaSnapshot;
Expand Down Expand Up @@ -682,14 +685,17 @@ public CompactionServerReportResponse compactionServerReport(RpcController contr
versionNumber = VersionInfoUtil.getVersionNumber(versionInfo);
}
ServerName serverName = ProtobufUtil.toServerName(request.getServer());
CompactionServerMetrics newLoad =
CompactionServerMetricsBuilder.toCompactionServerMetrics(serverName, versionNumber,
version, request.getLoad());
master.getCompactionOffloadManager().compactionServerReport(serverName, newLoad);
CompactionServerMetrics newLoad = CompactionServerMetricsBuilder
.toCompactionServerMetrics(serverName, versionNumber, version, request.getLoad());
Triple<Long, Long, Long> throughput =
master.getCompactionOffloadManager().compactionServerReport(serverName, newLoad);
return CompactionServerReportResponse.newBuilder()
.setMaxThroughputUpperBound(throughput.getFirst())
.setMaxThroughputLowerBound(throughput.getSecond())
.setMaxThroughputOffPeak(throughput.getThird()).build();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
return CompactionServerReportResponse.newBuilder().build();
}

@Override
Expand Down Expand Up @@ -3496,4 +3502,29 @@ public CompleteCompactionResponse completeCompaction(RpcController controller,
throw new UnsupportedOperationException("master not receive completeCompaction");
}

@Override
public UpdateCompactionServerTotalThroughputResponse updateCompactionServerTotalThroughput(
RpcController controller, UpdateCompactionServerTotalThroughputRequest request)
throws ServiceException {
if (request.getMaxThroughputUpperBound() > 0) {
master.getCompactionOffloadManager()
.setMaxThroughputUpperBound(request.getMaxThroughputUpperBound());
}
if (request.getMaxThroughputLowerBound() > 0) {
master.getCompactionOffloadManager()
.setMaxThroughputLowerBound(request.getMaxThroughputLowerBound());
}
if (request.getMaxThroughputOffPeak() > 0) {
master.getCompactionOffloadManager()
.setMaxThroughputOffPeak(request.getMaxThroughputOffPeak());
}
return UpdateCompactionServerTotalThroughputResponse.newBuilder()
.setMaxThroughputUpperBound(
master.getCompactionOffloadManager().getMaxThroughputUpperBound())
.setMaxThroughputLowerBound(
master.getCompactionOffloadManager().getMaxThroughputLowerBound())
.setMaxThroughputOffPeak(master.getCompactionOffloadManager().getMaxThroughputOffPeak())
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.hbase.master.procedure.SwitchCompactionOffloadProcedure;
import org.apache.hadoop.hbase.regionserver.CompactionOffloadSwitchStorage;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Triple;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,9 +56,46 @@ public class CompactionOffloadManager {
/** Map of registered servers to their current load */
private final Cache<ServerName, CompactionServerMetrics> onlineServers;
private CompactionOffloadSwitchStorage compactionOffloadSwitchStorage;
private volatile long maxThroughputUpperBound;
private volatile long maxThroughputLowerBound;
private volatile long maxThroughputOffPeak;
// throughput config
public static final String COMPACTION_SERVER_MAX_THROUGHPUT_HIGHER_BOUND =
"hbase.compaction.server.compaction.throughput.higher.bound";
public static final long DEFAULT_COMPACTION_SERVER_MAX_THROUGHPUT_HIGHER_BOUND = 1000L * 1024 * 1024;
public static final String COMPACTION_SERVER_MAX_THROUGHPUT_LOWER_BOUND =
"hbase.compaction.server.compaction.throughput.lower.bound";
public static final long DEFAULT_COMPACTION_SERVER_MAX_THROUGHPUT_LOWER_BOUND = 500L * 1024 * 1024;
public static final String COMPACTION_SERVER_MAX_THROUGHPUT_OFFPEAK =
"hbase.compaction.server.compaction.throughput.offpeak";
public static final long DEFAULT_COMPACTION_SERVER_MAX_THROUGHPUT_OFFPEAK = Long.MAX_VALUE;
private static final Logger LOG =
LoggerFactory.getLogger(CompactionOffloadManager.class.getName());

public long getMaxThroughputUpperBound() {
return maxThroughputUpperBound;
}

public void setMaxThroughputUpperBound(long maxThroughputUpperBound) {
this.maxThroughputUpperBound = maxThroughputUpperBound;
}

public long getMaxThroughputLowerBound() {
return maxThroughputLowerBound;
}

public void setMaxThroughputLowerBound(long maxThroughputLowerBound) {
this.maxThroughputLowerBound = maxThroughputLowerBound;
}

public long getMaxThroughputOffPeak() {
return maxThroughputOffPeak;
}

public void setMaxThroughputOffPeak(long maxThroughputOffPeak) {
this.maxThroughputOffPeak = maxThroughputOffPeak;
}

public CompactionOffloadManager(final MasterServices master) {
this.masterServices = master;
int compactionServerMsgInterval =
Expand All @@ -68,10 +106,22 @@ public CompactionOffloadManager(final MasterServices master) {
compactionServerMsgInterval * compactionServerExpiredFactor, TimeUnit.MILLISECONDS).build();
this.compactionOffloadSwitchStorage = new CompactionOffloadSwitchStorage(
masterServices.getZooKeeper(), masterServices.getConfiguration());
this.maxThroughputUpperBound =
master.getConfiguration().getLong(COMPACTION_SERVER_MAX_THROUGHPUT_HIGHER_BOUND,
DEFAULT_COMPACTION_SERVER_MAX_THROUGHPUT_HIGHER_BOUND);
this.maxThroughputLowerBound =
master.getConfiguration().getLong(COMPACTION_SERVER_MAX_THROUGHPUT_LOWER_BOUND,
DEFAULT_COMPACTION_SERVER_MAX_THROUGHPUT_LOWER_BOUND);
this.maxThroughputOffPeak = master.getConfiguration().getLong(
COMPACTION_SERVER_MAX_THROUGHPUT_OFFPEAK, DEFAULT_COMPACTION_SERVER_MAX_THROUGHPUT_OFFPEAK);
}

public void compactionServerReport(ServerName sn, CompactionServerMetrics sl) {
public Triple<Long, Long, Long> compactionServerReport(ServerName sn,
CompactionServerMetrics sl) {
this.onlineServers.put(sn, sl);
long size = Math.max(onlineServers.size(), 1);
return new Triple<>(maxThroughputUpperBound / size, maxThroughputLowerBound / size,
maxThroughputOffPeak / size);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class PressureAwareCompactionThroughputController extends PressureAwareTh
private static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_CONTROL_CHECK_INTERVAL =
"hbase.hstore.compaction.throughput.control.check.interval";

private long maxThroughputOffpeak;
private long maxThroughputOffPeak;

@Override
public void setup(final ThroughputControllerService server) {
Expand All @@ -90,7 +90,7 @@ private void tune(double compactionPressure) {
// set to unlimited if some stores already reach the blocking store file count
maxThroughputToSet = Double.MAX_VALUE;
} else if (offPeakHours.isOffPeakHour()) {
maxThroughputToSet = maxThroughputOffpeak;
maxThroughputToSet = maxThroughputOffPeak;
} else {
// compactionPressure is between 0.0 and 1.0, we use a simple linear formula to
// calculate the throughput limitation.
Expand Down Expand Up @@ -122,7 +122,7 @@ public void setConf(Configuration conf) {
this.maxThroughputLowerBound =
conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND);
this.maxThroughputOffpeak =
this.maxThroughputOffPeak =
conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK,
DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK);
this.offPeakHours = OffPeakHours.getInstance(conf);
Expand All @@ -136,7 +136,7 @@ public void setConf(Configuration conf) {
LOG.info("Compaction throughput configurations, higher bound: "
+ throughputDesc(maxThroughputUpperBound) + ", lower bound "
+ throughputDesc(maxThroughputLowerBound) + ", off peak: "
+ throughputDesc(maxThroughputOffpeak) + ", tuning period: " + tuningPeriod + " ms");
+ throughputDesc(maxThroughputOffPeak) + ", tuning period: " + tuningPeriod + " ms");
}

@Override
Expand All @@ -145,4 +145,12 @@ public String toString() {
+ throughputDesc(getMaxThroughput()) + ", activeCompactions=" + activeOperations.size()
+ "]";
}

public void setMaxThroughputOffPeak(long maxThroughputOffPeak) {
this.maxThroughputOffPeak = maxThroughputOffPeak;
}

public long getMaxThroughputOffPeak() {
return maxThroughputOffPeak;
}
}
Loading