Skip to content

Commit

Permalink
HBASE-26323 Introduce a Snapshot Procedure (#4115)
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
frostruan authored Mar 12, 2022
1 parent c248521 commit fd301ad
Show file tree
Hide file tree
Showing 30 changed files with 2,713 additions and 334 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1901,51 +1901,67 @@ public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
return failedFuture(e);
}
CompletableFuture<Void> future = new CompletableFuture<>();
final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
addListener(this.<Long> newMasterCaller()
.action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
resp -> resp.getExpectedTimeout()))
.call(), (expectedTimeout, err) -> {
final SnapshotRequest request =
SnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
.setNonce(ng.newNonce()).build();
addListener(this.<SnapshotResponse> newMasterCaller()
.action((controller, stub) ->
this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call(controller, stub,
request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp))
.call(), (resp, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
TimerTask pollingTask = new TimerTask() {
int tries = 0;
long startTime = EnvironmentEdgeManager.currentTime();
long endTime = startTime + expectedTimeout;
long maxPauseTime = expectedTimeout / maxAttempts;

@Override
public void run(Timeout timeout) throws Exception {
if (EnvironmentEdgeManager.currentTime() < endTime) {
addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else if (done) {
future.complete(null);
} else {
// retry again after pauseTime.
long pauseTime =
ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
pauseTime = Math.min(pauseTime, maxPauseTime);
AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
TimeUnit.MILLISECONDS);
}
});
} else {
future.completeExceptionally(
new SnapshotCreationException("Snapshot '" + snapshot.getName() +
"' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
}
}
};
AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
waitSnapshotFinish(snapshotDesc, future, resp);
});
return future;
}

// This is for keeping compatibility with old implementation.
// If there is a procId field in the response, then the snapshot will be operated with a
// SnapshotProcedure, otherwise the snapshot will be coordinated by zk.
private void waitSnapshotFinish(SnapshotDescription snapshot,
CompletableFuture<Void> future, SnapshotResponse resp) {
if (resp.hasProcId()) {
getProcedureResult(resp.getProcId(), future, 0);
addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName()));
} else {
long expectedTimeout = resp.getExpectedTimeout();
TimerTask pollingTask = new TimerTask() {
int tries = 0;
long startTime = EnvironmentEdgeManager.currentTime();
long endTime = startTime + expectedTimeout;
long maxPauseTime = expectedTimeout / maxAttempts;

@Override
public void run(Timeout timeout) throws Exception {
if (EnvironmentEdgeManager.currentTime() < endTime) {
addListener(isSnapshotFinished(snapshot), (done, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else if (done) {
future.complete(null);
} else {
// retry again after pauseTime.
long pauseTime = ConnectionUtils
.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
pauseTime = Math.min(pauseTime, maxPauseTime);
AsyncConnectionImpl.RETRY_TIMER
.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
}
});
} else {
future.completeExceptionally(new SnapshotCreationException(
"Snapshot '" + snapshot.getName() + "' wasn't completed in expectedTime:"
+ expectedTimeout + " ms", snapshot));
}
}
};
AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
}
}

@Override
public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
return this
Expand Down Expand Up @@ -2800,6 +2816,18 @@ String getOperationType() {
}
}

private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
SnapshotProcedureBiConsumer(TableName tableName) {
super(tableName);
}

@Override
String getOperationType() {
return "SNAPSHOT";
}
}


private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
private final String peerId;
private final Supplier<String> getOperation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,13 @@ message IsCleanerChoreEnabledResponse {

message SnapshotRequest {
required SnapshotDescription snapshot = 1;
optional uint64 nonce_group = 2 [default = 0];
optional uint64 nonce = 3 [default = 0];
}

message SnapshotResponse {
required int64 expected_timeout = 1;
optional int64 proc_id = 2;
}

message GetCompletedSnapshotsRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,45 @@ message RestoreParentToChildRegionsPair {
required string child2_region_name = 3;
}

enum SnapshotState {
SNAPSHOT_PREPARE = 1;
SNAPSHOT_PRE_OPERATION = 2;
SNAPSHOT_WRITE_SNAPSHOT_INFO = 3;
SNAPSHOT_SNAPSHOT_ONLINE_REGIONS = 4;
SNAPSHOT_SNAPSHOT_SPLIT_REGIONS = 5;
SNAPSHOT_SNAPSHOT_CLOSED_REGIONS = 6;
SNAPSHOT_SNAPSHOT_MOB_REGION = 7;
SNAPSHOT_CONSOLIDATE_SNAPSHOT = 8;
SNAPSHOT_VERIFIER_SNAPSHOT = 9;
SNAPSHOT_COMPLETE_SNAPSHOT = 10;
SNAPSHOT_POST_OPERATION = 11;
}

message SnapshotProcedureStateData {
required SnapshotDescription snapshot = 1;
}

message SnapshotRegionProcedureStateData {
required RegionInfo region = 1;
required SnapshotDescription snapshot = 2;
}

message SnapshotRegionParameter {
required RegionInfo region = 1;
required SnapshotDescription snapshot = 2;
}

message SnapshotVerifyProcedureStateData {
required SnapshotDescription snapshot = 1;
required RegionInfo region = 2;
optional ServerName target_server = 3;
}

message SnapshotVerifyParameter {
required SnapshotDescription snapshot = 1;
required RegionInfo region = 2;
}

enum CloneSnapshotState {
CLONE_SNAPSHOT_PRE_OPERATION = 1;
CLONE_SNAPSHOT_WRITE_FS_LAYOUT = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,21 @@ public enum EventType {
*
* RS_CLAIM_REPLICATION_QUEUE
*/
RS_CLAIM_REPLICATION_QUEUE(86, ExecutorType.RS_CLAIM_REPLICATION_QUEUE);
RS_CLAIM_REPLICATION_QUEUE(86, ExecutorType.RS_CLAIM_REPLICATION_QUEUE),

/**
* RS snapshot regions.<br>
*
* RS_SNAPSHOT_REGIONS
*/
RS_SNAPSHOT_REGIONS(87, ExecutorType.RS_SNAPSHOT_OPERATIONS),

/**
* RS verify snapshot.<br>
*
* RS_VERIFY_SNAPSHOT
*/
RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS);

private final int code;
private final ExecutorType executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public enum ExecutorType {
RS_REPLAY_SYNC_REPLICATION_WAL(32),
RS_SWITCH_RPC_THROTTLE(33),
RS_IN_MEMORY_COMPACTION(34),
RS_CLAIM_REPLICATION_QUEUE(35);
RS_CLAIM_REPLICATION_QUEUE(35),
RS_SNAPSHOT_OPERATIONS(36);

ExecutorType(int value) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1092,8 +1092,8 @@ public void call(MasterObserver observer) throws IOException {
}

public void preSnapshot(final SnapshotDescription snapshot,
final TableDescriptor hTableDescriptor) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
final TableDescriptor hTableDescriptor, final User user) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preSnapshot(this, snapshot, hTableDescriptor);
Expand All @@ -1102,8 +1102,8 @@ public void call(MasterObserver observer) throws IOException {
}

public void postSnapshot(final SnapshotDescription snapshot,
final TableDescriptor hTableDescriptor) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
final TableDescriptor hTableDescriptor, final User user) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postSnapshot(this, snapshot, hTableDescriptor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1736,12 +1736,25 @@ public SnapshotResponse snapshot(RpcController controller,
// get the snapshot information
SnapshotDescription snapshot = SnapshotDescriptionUtils.validate(
request.getSnapshot(), server.getConfiguration());
server.snapshotManager.takeSnapshot(snapshot);

// send back the max amount of time the client should wait for the snapshot to complete
long waitTime = SnapshotDescriptionUtils.getMaxMasterTimeout(server.getConfiguration(),
snapshot.getType(), SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME);
return SnapshotResponse.newBuilder().setExpectedTimeout(waitTime).build();

SnapshotResponse.Builder builder = SnapshotResponse.newBuilder().setExpectedTimeout(waitTime);

// If there is nonce group and nonce in the snapshot request, then the client can
// handle snapshot procedure procId. And if enable the snapshot procedure, we
// will do the snapshot work with proc-v2, otherwise we will fall back to zk proc.
if (request.hasNonceGroup() && request.hasNonce() &&
server.snapshotManager.snapshotProcedureEnabled()) {
long nonceGroup = request.getNonceGroup();
long nonce = request.getNonce();
long procId = server.snapshotManager.takeSnapshot(snapshot, nonceGroup, nonce);
return builder.setProcId(procId).build();
} else {
server.snapshotManager.takeSnapshot(snapshot);
return builder.build();
}
} catch (ForeignException e) {
throw new ServiceException(e.getCause());
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -73,7 +71,7 @@ public class SplitWALManager {
private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class);

private final MasterServices master;
private final SplitWorkerAssigner splitWorkerAssigner;
private final WorkerAssigner splitWorkerAssigner;
private final Path rootDir;
private final FileSystem fs;
private final Configuration conf;
Expand All @@ -82,8 +80,9 @@ public class SplitWALManager {
public SplitWALManager(MasterServices master) throws IOException {
this.master = master;
this.conf = master.getConfiguration();
this.splitWorkerAssigner = new SplitWorkerAssigner(this.master,
conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
this.splitWorkerAssigner = new WorkerAssigner(this.master,
conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER),
new ProcedureEvent<>("split-WAL-worker-assigning"));
this.rootDir = master.getMasterFileSystem().getWALRootDir();
this.fs = master.getMasterFileSystem().getWALFileSystem();
this.walArchiveDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Expand Down Expand Up @@ -189,67 +188,4 @@ public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler sc
public void addUsedSplitWALWorker(ServerName worker){
splitWorkerAssigner.addUsedWorker(worker);
}

/**
* help assign and release a worker for each WAL splitting task
* For each worker, concurrent running splitting task should be no more than maxSplitTasks
* If a task failed to acquire a worker, it will suspend and wait for workers available
*
*/
private static final class SplitWorkerAssigner implements ServerListener {
private int maxSplitTasks;
private final ProcedureEvent<?> event;
private Map<ServerName, Integer> currentWorkers = new HashMap<>();
private MasterServices master;

public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) {
this.maxSplitTasks = maxSplitTasks;
this.master = master;
this.event = new ProcedureEvent<>("split-WAL-worker-assigning");
// ServerManager might be null in a test context where we are mocking; allow for this
ServerManager sm = this.master.getServerManager();
if (sm != null) {
sm.registerListener(this);
}
}

public synchronized Optional<ServerName> acquire() {
List<ServerName> serverList = master.getServerManager().getOnlineServersList();
Collections.shuffle(serverList);
Optional<ServerName> worker = serverList.stream().filter(
serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0)
.findAny();
if (worker.isPresent()) {
currentWorkers.compute(worker.get(), (serverName,
availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1);
}
return worker;
}

public synchronized void release(ServerName serverName) {
currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
}

public void suspend(Procedure<?> proc) {
event.suspend();
event.suspendIfNotReady(proc);
}

public void wake(MasterProcedureScheduler scheduler) {
if (!event.isReady()) {
event.wake(scheduler);
}
}

@Override
public void serverAdded(ServerName worker) {
this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
}

public synchronized void addUsedWorker(ServerName worker) {
// load used worker when master restart
currentWorkers.compute(worker, (serverName,
availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1);
}
}
}
Loading

0 comments on commit fd301ad

Please sign in to comment.