From fd301ad55f145c3aad5e49f8b4b97b11683d2b74 Mon Sep 17 00:00:00 2001 From: Ruanhui <32773751+frostruan@users.noreply.github.com> Date: Sat, 12 Mar 2022 23:21:22 +0800 Subject: [PATCH] HBASE-26323 Introduce a Snapshot Procedure (#4115) Signed-off-by: Duo Zhang --- .../hbase/client/RawAsyncHBaseAdmin.java | 102 ++-- .../main/protobuf/server/master/Master.proto | 3 + .../server/master/MasterProcedure.proto | 39 ++ .../hadoop/hbase/executor/EventType.java | 16 +- .../hadoop/hbase/executor/ExecutorType.java | 3 +- .../hbase/master/MasterCoprocessorHost.java | 8 +- .../hbase/master/MasterRpcServices.java | 19 +- .../hadoop/hbase/master/SplitWALManager.java | 72 +-- .../hadoop/hbase/master/WorkerAssigner.java | 95 ++++ .../procedure/ServerProcedureInterface.java | 7 +- .../hbase/master/procedure/ServerQueue.java | 1 + .../master/procedure/SnapshotProcedure.java | 475 ++++++++++++++++++ .../procedure/SnapshotRegionProcedure.java | 268 ++++++++++ .../procedure/SnapshotVerifyProcedure.java | 226 +++++++++ .../procedure/TableProcedureInterface.java | 2 +- .../hbase/master/procedure/TableQueue.java | 2 + .../snapshot/MasterSnapshotVerifier.java | 46 +- .../master/snapshot/SnapshotManager.java | 369 +++++++++++--- .../master/snapshot/TakeSnapshotHandler.java | 14 +- .../hbase/regionserver/HRegionServer.java | 12 + .../regionserver/RSSnapshotVerifier.java | 130 +++++ .../regionserver/SnapshotRegionCallable.java | 96 ++++ .../regionserver/SnapshotVerifyCallable.java | 48 ++ .../snapshot/SnapshotDescriptionUtils.java | 31 +- .../hbase/snapshot/SnapshotReferenceUtil.java | 14 +- .../procedure/TestSnapshotProcedure.java | 413 +++++++++++++++ .../TestSnapshotRegionProcedure.java | 148 ++++++ .../TestSnapshotVerifyProcedure.java | 174 +++++++ .../snapshot/TestSnapshotWhileRSCrashes.java | 98 ---- .../regionserver/TestRSSnapshotVerifier.java | 116 +++++ 30 files changed, 2713 insertions(+), 334 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSSnapshotVerifier.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedure.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotRegionProcedure.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotVerifyProcedure.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotWhileRSCrashes.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSSnapshotVerifier.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index b7bf6c5c9c48..572eb0960ea1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -1901,51 +1901,67 @@ public CompletableFuture snapshot(SnapshotDescription snapshotDesc) { return failedFuture(e); } CompletableFuture future = new CompletableFuture<>(); - final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build(); - addListener(this. newMasterCaller() - .action((controller, stub) -> this. call(controller, - stub, request, (s, c, req, done) -> s.snapshot(c, req, done), - resp -> resp.getExpectedTimeout())) - .call(), (expectedTimeout, err) -> { + final SnapshotRequest request = + SnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup()) + .setNonce(ng.newNonce()).build(); + addListener(this. newMasterCaller() + .action((controller, stub) -> + this. call(controller, stub, + request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp)) + .call(), (resp, err) -> { if (err != null) { future.completeExceptionally(err); return; } - TimerTask pollingTask = new TimerTask() { - int tries = 0; - long startTime = EnvironmentEdgeManager.currentTime(); - long endTime = startTime + expectedTimeout; - long maxPauseTime = expectedTimeout / maxAttempts; - - @Override - public void run(Timeout timeout) throws Exception { - if (EnvironmentEdgeManager.currentTime() < endTime) { - addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else if (done) { - future.complete(null); - } else { - // retry again after pauseTime. - long pauseTime = - ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); - pauseTime = Math.min(pauseTime, maxPauseTime); - AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, - TimeUnit.MILLISECONDS); - } - }); - } else { - future.completeExceptionally( - new SnapshotCreationException("Snapshot '" + snapshot.getName() + - "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc)); - } - } - }; - AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); + waitSnapshotFinish(snapshotDesc, future, resp); }); return future; } + // This is for keeping compatibility with old implementation. + // If there is a procId field in the response, then the snapshot will be operated with a + // SnapshotProcedure, otherwise the snapshot will be coordinated by zk. + private void waitSnapshotFinish(SnapshotDescription snapshot, + CompletableFuture future, SnapshotResponse resp) { + if (resp.hasProcId()) { + getProcedureResult(resp.getProcId(), future, 0); + addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName())); + } else { + long expectedTimeout = resp.getExpectedTimeout(); + TimerTask pollingTask = new TimerTask() { + int tries = 0; + long startTime = EnvironmentEdgeManager.currentTime(); + long endTime = startTime + expectedTimeout; + long maxPauseTime = expectedTimeout / maxAttempts; + + @Override + public void run(Timeout timeout) throws Exception { + if (EnvironmentEdgeManager.currentTime() < endTime) { + addListener(isSnapshotFinished(snapshot), (done, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else if (done) { + future.complete(null); + } else { + // retry again after pauseTime. + long pauseTime = ConnectionUtils + .getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); + pauseTime = Math.min(pauseTime, maxPauseTime); + AsyncConnectionImpl.RETRY_TIMER + .newTimeout(this, pauseTime, TimeUnit.MILLISECONDS); + } + }); + } else { + future.completeExceptionally(new SnapshotCreationException( + "Snapshot '" + snapshot.getName() + "' wasn't completed in expectedTime:" + + expectedTimeout + " ms", snapshot)); + } + } + }; + AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); + } + } + @Override public CompletableFuture isSnapshotFinished(SnapshotDescription snapshot) { return this @@ -2800,6 +2816,18 @@ String getOperationType() { } } + private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer { + SnapshotProcedureBiConsumer(TableName tableName) { + super(tableName); + } + + @Override + String getOperationType() { + return "SNAPSHOT"; + } + } + + private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer { private final String peerId; private final Supplier getOperation; diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto index dce0e68b2db9..94a434755cff 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto @@ -437,10 +437,13 @@ message IsCleanerChoreEnabledResponse { message SnapshotRequest { required SnapshotDescription snapshot = 1; + optional uint64 nonce_group = 2 [default = 0]; + optional uint64 nonce = 3 [default = 0]; } message SnapshotResponse { required int64 expected_timeout = 1; + optional int64 proc_id = 2; } message GetCompletedSnapshotsRequest { diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index 7795e2edf5f7..35125a5a94e6 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -191,6 +191,45 @@ message RestoreParentToChildRegionsPair { required string child2_region_name = 3; } +enum SnapshotState { + SNAPSHOT_PREPARE = 1; + SNAPSHOT_PRE_OPERATION = 2; + SNAPSHOT_WRITE_SNAPSHOT_INFO = 3; + SNAPSHOT_SNAPSHOT_ONLINE_REGIONS = 4; + SNAPSHOT_SNAPSHOT_SPLIT_REGIONS = 5; + SNAPSHOT_SNAPSHOT_CLOSED_REGIONS = 6; + SNAPSHOT_SNAPSHOT_MOB_REGION = 7; + SNAPSHOT_CONSOLIDATE_SNAPSHOT = 8; + SNAPSHOT_VERIFIER_SNAPSHOT = 9; + SNAPSHOT_COMPLETE_SNAPSHOT = 10; + SNAPSHOT_POST_OPERATION = 11; +} + +message SnapshotProcedureStateData { + required SnapshotDescription snapshot = 1; +} + +message SnapshotRegionProcedureStateData { + required RegionInfo region = 1; + required SnapshotDescription snapshot = 2; +} + +message SnapshotRegionParameter { + required RegionInfo region = 1; + required SnapshotDescription snapshot = 2; +} + +message SnapshotVerifyProcedureStateData { + required SnapshotDescription snapshot = 1; + required RegionInfo region = 2; + optional ServerName target_server = 3; +} + +message SnapshotVerifyParameter { + required SnapshotDescription snapshot = 1; + required RegionInfo region = 2; +} + enum CloneSnapshotState { CLONE_SNAPSHOT_PRE_OPERATION = 1; CLONE_SNAPSHOT_WRITE_FS_LAYOUT = 2; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index a39493cc2628..0b608be369a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -301,7 +301,21 @@ public enum EventType { * * RS_CLAIM_REPLICATION_QUEUE */ - RS_CLAIM_REPLICATION_QUEUE(86, ExecutorType.RS_CLAIM_REPLICATION_QUEUE); + RS_CLAIM_REPLICATION_QUEUE(86, ExecutorType.RS_CLAIM_REPLICATION_QUEUE), + + /** + * RS snapshot regions.
+ * + * RS_SNAPSHOT_REGIONS + */ + RS_SNAPSHOT_REGIONS(87, ExecutorType.RS_SNAPSHOT_OPERATIONS), + + /** + * RS verify snapshot.
+ * + * RS_VERIFY_SNAPSHOT + */ + RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS); private final int code; private final ExecutorType executor; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index 120f9bea5b7b..cbecb3e8619f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -53,7 +53,8 @@ public enum ExecutorType { RS_REPLAY_SYNC_REPLICATION_WAL(32), RS_SWITCH_RPC_THROTTLE(33), RS_IN_MEMORY_COMPACTION(34), - RS_CLAIM_REPLICATION_QUEUE(35); + RS_CLAIM_REPLICATION_QUEUE(35), + RS_SNAPSHOT_OPERATIONS(36); ExecutorType(int value) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java index 8037fa173c37..1ff7a667553d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -1092,8 +1092,8 @@ public void call(MasterObserver observer) throws IOException { } public void preSnapshot(final SnapshotDescription snapshot, - final TableDescriptor hTableDescriptor) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { + final TableDescriptor hTableDescriptor, final User user) throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) { @Override public void call(MasterObserver observer) throws IOException { observer.preSnapshot(this, snapshot, hTableDescriptor); @@ -1102,8 +1102,8 @@ public void call(MasterObserver observer) throws IOException { } public void postSnapshot(final SnapshotDescription snapshot, - final TableDescriptor hTableDescriptor) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { + final TableDescriptor hTableDescriptor, final User user) throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) { @Override public void call(MasterObserver observer) throws IOException { observer.postSnapshot(this, snapshot, hTableDescriptor); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index d53a27161f55..abfc45bcf1a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -1736,12 +1736,25 @@ public SnapshotResponse snapshot(RpcController controller, // get the snapshot information SnapshotDescription snapshot = SnapshotDescriptionUtils.validate( request.getSnapshot(), server.getConfiguration()); - server.snapshotManager.takeSnapshot(snapshot); - // send back the max amount of time the client should wait for the snapshot to complete long waitTime = SnapshotDescriptionUtils.getMaxMasterTimeout(server.getConfiguration(), snapshot.getType(), SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME); - return SnapshotResponse.newBuilder().setExpectedTimeout(waitTime).build(); + + SnapshotResponse.Builder builder = SnapshotResponse.newBuilder().setExpectedTimeout(waitTime); + + // If there is nonce group and nonce in the snapshot request, then the client can + // handle snapshot procedure procId. And if enable the snapshot procedure, we + // will do the snapshot work with proc-v2, otherwise we will fall back to zk proc. + if (request.hasNonceGroup() && request.hasNonce() && + server.snapshotManager.snapshotProcedureEnabled()) { + long nonceGroup = request.getNonceGroup(); + long nonce = request.getNonce(); + long procId = server.snapshotManager.takeSnapshot(snapshot, nonceGroup, nonce); + return builder.setProcId(procId).build(); + } else { + server.snapshotManager.takeSnapshot(snapshot); + return builder.build(); + } } catch (ForeignException e) { throw new ServiceException(e.getCause()); } catch (IOException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java index 6db094c4e6df..d9eea26e0190 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java @@ -23,9 +23,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; @@ -73,7 +71,7 @@ public class SplitWALManager { private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class); private final MasterServices master; - private final SplitWorkerAssigner splitWorkerAssigner; + private final WorkerAssigner splitWorkerAssigner; private final Path rootDir; private final FileSystem fs; private final Configuration conf; @@ -82,8 +80,9 @@ public class SplitWALManager { public SplitWALManager(MasterServices master) throws IOException { this.master = master; this.conf = master.getConfiguration(); - this.splitWorkerAssigner = new SplitWorkerAssigner(this.master, - conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER)); + this.splitWorkerAssigner = new WorkerAssigner(this.master, + conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER), + new ProcedureEvent<>("split-WAL-worker-assigning")); this.rootDir = master.getMasterFileSystem().getWALRootDir(); this.fs = master.getMasterFileSystem().getWALFileSystem(); this.walArchiveDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME); @@ -189,67 +188,4 @@ public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler sc public void addUsedSplitWALWorker(ServerName worker){ splitWorkerAssigner.addUsedWorker(worker); } - - /** - * help assign and release a worker for each WAL splitting task - * For each worker, concurrent running splitting task should be no more than maxSplitTasks - * If a task failed to acquire a worker, it will suspend and wait for workers available - * - */ - private static final class SplitWorkerAssigner implements ServerListener { - private int maxSplitTasks; - private final ProcedureEvent event; - private Map currentWorkers = new HashMap<>(); - private MasterServices master; - - public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) { - this.maxSplitTasks = maxSplitTasks; - this.master = master; - this.event = new ProcedureEvent<>("split-WAL-worker-assigning"); - // ServerManager might be null in a test context where we are mocking; allow for this - ServerManager sm = this.master.getServerManager(); - if (sm != null) { - sm.registerListener(this); - } - } - - public synchronized Optional acquire() { - List serverList = master.getServerManager().getOnlineServersList(); - Collections.shuffle(serverList); - Optional worker = serverList.stream().filter( - serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0) - .findAny(); - if (worker.isPresent()) { - currentWorkers.compute(worker.get(), (serverName, - availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1); - } - return worker; - } - - public synchronized void release(ServerName serverName) { - currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1); - } - - public void suspend(Procedure proc) { - event.suspend(); - event.suspendIfNotReady(proc); - } - - public void wake(MasterProcedureScheduler scheduler) { - if (!event.isReady()) { - event.wake(scheduler); - } - } - - @Override - public void serverAdded(ServerName worker) { - this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); - } - - public synchronized void addUsedWorker(ServerName worker) { - // load used worker when master restart - currentWorkers.compute(worker, (serverName, - availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1); - } - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java new file mode 100644 index 000000000000..bc050524ad13 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * help assign and release a worker for each remote task. + * For each worker, concurrent running task should be no more than maxTasks. + * If a task failed to acquire a worker, it will suspend and wait for workers available. + */ +@InterfaceAudience.Private +public class WorkerAssigner implements ServerListener { + private final Map currentWorkers = new HashMap<>(); + private final MasterServices master; + private final ProcedureEvent event; + private final int maxTasks; + + public WorkerAssigner(MasterServices master, int maxTasks, ProcedureEvent event) { + this.maxTasks = maxTasks; + this.master = master; + this.event = event; + // ServerManager might be null in a test context where we are mocking; allow for this + ServerManager sm = this.master.getServerManager(); + if (sm != null) { + sm.registerListener(this); + } + } + + public synchronized Optional acquire() { + List serverList = master.getServerManager().getOnlineServersList(); + Collections.shuffle(serverList); + Optional worker = serverList.stream().filter( + serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0) + .findAny(); + worker.ifPresent(name -> currentWorkers.compute(name, (serverName, availableWorker) -> + availableWorker == null ? maxTasks - 1 : availableWorker - 1)); + return worker; + } + + public synchronized void release(ServerName serverName) { + currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1); + } + + public void suspend(Procedure proc) { + event.suspend(); + event.suspendIfNotReady(proc); + } + + public void wake(MasterProcedureScheduler scheduler) { + if (!event.isReady()) { + event.wake(scheduler); + } + } + + @Override + public void serverAdded(ServerName worker) { + this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); + } + + public synchronized void addUsedWorker(ServerName worker) { + // load used worker when master restart + currentWorkers.compute(worker, (serverName, + availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1); + } + + public Integer getAvailableWorker(ServerName serverName) { + return currentWorkers.get(serverName); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java index a7abfdc13f5b..16d45f2e3f9e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java @@ -50,7 +50,12 @@ public enum ServerOperationType { /** * send the claim replication queue request to region server to actually assign it */ - CLAIM_REPLICATION_QUEUE_REMOTE + CLAIM_REPLICATION_QUEUE_REMOTE, + + /** + * send verify snapshot request to region server and handle the response + */ + VERIFY_SNAPSHOT } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java index 726ee14c979f..2cbceb22e514 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java @@ -40,6 +40,7 @@ public boolean requireExclusiveLock(Procedure proc) { case SPLIT_WAL_REMOTE: case CLAIM_REPLICATION_QUEUES: case CLAIM_REPLICATION_QUEUE_REMOTE: + case VERIFY_SNAPSHOT: return false; default: break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java new file mode 100644 index 000000000000..769937efc6af --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.MetricsSnapshot; +import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure; +import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure; +import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.ModifyRegionUtils; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotProcedureStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; + +/** + * A procedure used to take snapshot on tables. + */ +@InterfaceAudience.Private +public class SnapshotProcedure + extends AbstractStateMachineTableProcedure { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotProcedure.class); + private final MetricsSnapshot metricsSnapshot = new MetricsSnapshot(); + + private Configuration conf; + private SnapshotDescription snapshot; + private Path rootDir; + private Path snapshotDir; + private Path workingDir; + private FileSystem workingDirFS; + private FileSystem rootFs; + private TableName snapshotTable; + private MonitoredTask status; + private SnapshotManifest snapshotManifest; + private TableDescriptor htd; + + private RetryCounter retryCounter; + + public SnapshotProcedure() { } + + public SnapshotProcedure(final MasterProcedureEnv env, final SnapshotDescription snapshot) { + super(env); + this.snapshot = snapshot; + } + + @Override + public TableName getTableName() { + return TableName.valueOf(snapshot.getTable()); + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.SNAPSHOT; + } + + @Override + protected LockState acquireLock(MasterProcedureEnv env) { + // AbstractStateMachineTableProcedure acquires exclusive table lock by default, + // but we may need to downgrade it to shared lock for some reasons: + // a. exclusive lock has a negative effect on assigning region. See HBASE-21480 for details. + // b. we want to support taking multiple different snapshots on same table on the same time. + if (env.getProcedureScheduler().waitTableSharedLock(this, getTableName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(MasterProcedureEnv env) { + env.getProcedureScheduler().wakeTableSharedLock(this, getTableName()); + } + + @Override + protected boolean holdLock(MasterProcedureEnv env) { + // In order to avoid enabling/disabling/modifying/deleting table during snapshot, + // we don't release lock during suspend + return true; + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, SnapshotState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + LOG.info("{} execute state={}", this, state); + + try { + switch (state) { + case SNAPSHOT_PREPARE: + prepareSnapshot(env); + setNextState(SnapshotState.SNAPSHOT_PRE_OPERATION); + return Flow.HAS_MORE_STATE; + case SNAPSHOT_PRE_OPERATION: + preSnapshot(env); + setNextState(SnapshotState.SNAPSHOT_WRITE_SNAPSHOT_INFO); + return Flow.HAS_MORE_STATE; + case SNAPSHOT_WRITE_SNAPSHOT_INFO: + SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, workingDirFS); + TableState tableState = + env.getMasterServices().getTableStateManager().getTableState(snapshotTable); + if (tableState.isEnabled()) { + setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS); + } else if (tableState.isDisabled()) { + setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_CLOSED_REGIONS); + } + return Flow.HAS_MORE_STATE; + case SNAPSHOT_SNAPSHOT_ONLINE_REGIONS: + addChildProcedure(createRemoteSnapshotProcedures(env)); + setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_SPLIT_REGIONS); + return Flow.HAS_MORE_STATE; + case SNAPSHOT_SNAPSHOT_SPLIT_REGIONS: + snapshotSplitRegions(env); + setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION); + return Flow.HAS_MORE_STATE; + case SNAPSHOT_SNAPSHOT_CLOSED_REGIONS: + snapshotClosedRegions(env); + setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION); + return Flow.HAS_MORE_STATE; + case SNAPSHOT_SNAPSHOT_MOB_REGION: + snapshotMobRegion(env); + setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT); + return Flow.HAS_MORE_STATE; + case SNAPSHOT_CONSOLIDATE_SNAPSHOT: + // flush the in-memory state, and write the single manifest + status.setStatus("Consolidate snapshot: " + snapshot.getName()); + snapshotManifest.consolidate(); + setNextState(SnapshotState.SNAPSHOT_VERIFIER_SNAPSHOT); + return Flow.HAS_MORE_STATE; + case SNAPSHOT_VERIFIER_SNAPSHOT: + status.setStatus("Verifying snapshot: " + snapshot.getName()); + verifySnapshot(env); + setNextState(SnapshotState.SNAPSHOT_COMPLETE_SNAPSHOT); + return Flow.HAS_MORE_STATE; + case SNAPSHOT_COMPLETE_SNAPSHOT: + if (isSnapshotCorrupted()) { + throw new CorruptedSnapshotException(snapshot.getName()); + } + completeSnapshot(env); + setNextState(SnapshotState.SNAPSHOT_POST_OPERATION); + return Flow.HAS_MORE_STATE; + case SNAPSHOT_POST_OPERATION: + postSnapshot(env); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } catch (ProcedureSuspendedException e) { + throw e; + } catch (Exception e) { + setFailure("master-snapshot", e); + LOG.warn("unexpected exception while execute {}. Mark procedure Failed.", this, e); + status.abort("Abort Snapshot " + snapshot.getName() + " on Table " + snapshotTable); + return Flow.NO_MORE_STATE; + } + } + + @Override + protected void rollbackState(MasterProcedureEnv env, SnapshotState state) + throws IOException, InterruptedException { + if (state == SnapshotState.SNAPSHOT_PRE_OPERATION) { + try { + if (!workingDirFS.delete(workingDir, true)) { + LOG.error("Couldn't delete snapshot working directory {}", workingDir); + } + } catch (IOException e) { + LOG.error("Couldn't delete snapshot working directory {}", workingDir, e); + } + } + } + + @Override + protected boolean isRollbackSupported(SnapshotState state) { + return true; + } + + @Override + protected SnapshotState getState(final int stateId) { + return SnapshotState.forNumber(stateId); + } + + @Override + protected int getStateId(SnapshotState state) { + return state.getNumber(); + } + + @Override + protected SnapshotState getInitialState() { + return SnapshotState.SNAPSHOT_PREPARE; + } + + private void prepareSnapshot(MasterProcedureEnv env) + throws ProcedureSuspendedException, IOException { + if (isAnySplitOrMergeProcedureRunning(env)) { + if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); + } + long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); + LOG.warn("{} waits {} ms for Split/Merge procedure to finish", this, backoff); + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); + } + prepareSnapshotEnv(env); + } + + private void prepareSnapshotEnv(MasterProcedureEnv env) throws IOException { + this.conf = env.getMasterConfiguration(); + this.snapshotTable = TableName.valueOf(snapshot.getTable()); + this.htd = loadTableDescriptorSnapshot(env); + this.rootFs = env.getMasterFileSystem().getFileSystem(); + this.rootDir = CommonFSUtils.getRootDir(conf); + this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir); + this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, conf); + this.workingDirFS = workingDir.getFileSystem(conf); + this.status = TaskMonitor.get() + .createStatus("Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable); + ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(snapshot.getName()); + this.snapshotManifest = SnapshotManifest.create(conf, + rootFs, workingDir, snapshot, monitor, status); + this.snapshotManifest.addTableDescriptor(htd); + } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + private boolean isAnySplitOrMergeProcedureRunning(MasterProcedureEnv env) { + return env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream() + .filter(p -> !p.isFinished()) + .filter(p -> p instanceof SplitTableRegionProcedure || + p instanceof MergeTableRegionsProcedure) + .anyMatch(p -> ((AbstractStateMachineTableProcedure) p) + .getTableName().equals(getTableName())); + } + + private TableDescriptor loadTableDescriptorSnapshot(MasterProcedureEnv env) throws IOException { + TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(snapshotTable); + if (htd == null) { + throw new IOException("TableDescriptor missing for " + snapshotTable); + } + if (htd.getMaxFileSize() == -1 && this.snapshot.getMaxFileSize() > 0) { + return TableDescriptorBuilder.newBuilder(htd).setValue(TableDescriptorBuilder.MAX_FILESIZE, + Long.toString(this.snapshot.getMaxFileSize())).build(); + } + return htd; + } + + private void preSnapshot(MasterProcedureEnv env) throws IOException { + env.getMasterServices().getSnapshotManager().prepareWorkingDirectory(snapshot); + + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preSnapshot(ProtobufUtil.createSnapshotDesc(snapshot), htd, getUser()); + } + } + + private void postSnapshot(MasterProcedureEnv env) throws IOException { + SnapshotManager sm = env.getMasterServices().getSnapshotManager(); + if (sm != null) { + sm.unregisterSnapshotProcedure(snapshot, getProcId()); + } + + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postSnapshot(ProtobufUtil.createSnapshotDesc(snapshot), htd, getUser()); + } + } + + private void verifySnapshot(MasterProcedureEnv env) throws IOException { + int verifyThreshold = env.getMasterConfiguration() + .getInt("hbase.snapshot.remote.verify.threshold", 10000); + List regions = env.getAssignmentManager() + .getTableRegions(snapshotTable, false) + .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)).collect(Collectors.toList()); + int numRegions = regions.size(); + + MasterSnapshotVerifier verifier = + new MasterSnapshotVerifier(env.getMasterServices(), snapshot, workingDirFS); + if (numRegions >= verifyThreshold) { + verifier.verifySnapshot(workingDir, false); + addChildProcedure(regions.stream() + .map(r -> new SnapshotVerifyProcedure(snapshot, r)) + .toArray(SnapshotVerifyProcedure[]::new)); + } else { + verifier.verifySnapshot(workingDir, true); + } + } + + private void completeSnapshot(MasterProcedureEnv env) throws IOException { + // complete the snapshot, atomically moving from tmp to .snapshot dir. + SnapshotDescriptionUtils.completeSnapshot(snapshotDir, workingDir, + env.getMasterFileSystem().getFileSystem(), workingDirFS, conf); + // update metric. when master restarts, the metric value is wrong + metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime()); + if (env.getMasterCoprocessorHost() != null) { + env.getMasterCoprocessorHost() + .postCompletedSnapshotAction(ProtobufUtil.createSnapshotDesc(snapshot), htd); + } + status.markComplete("Snapshot " + snapshot.getName() + " completed"); + } + + private void snapshotSplitRegions(MasterProcedureEnv env) throws IOException { + List regions = getDefaultRegionReplica(env) + .filter(RegionInfo::isSplit).collect(Collectors.toList()); + snapshotSplitOrClosedRegions(env, regions, "SplitRegionsSnapshotPool"); + } + + private void snapshotClosedRegions(MasterProcedureEnv env) throws IOException { + List regions = getDefaultRegionReplica(env).collect(Collectors.toList()); + snapshotSplitOrClosedRegions(env, regions, "ClosedRegionsSnapshotPool"); + } + + private Stream getDefaultRegionReplica(MasterProcedureEnv env) { + return env.getAssignmentManager().getTableRegions(snapshotTable, false) + .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)); + } + + private void snapshotSplitOrClosedRegions(MasterProcedureEnv env, + List regions, String threadPoolName) throws IOException { + ThreadPoolExecutor exec = SnapshotManifest + .createExecutor(env.getMasterConfiguration(), threadPoolName); + try { + ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() { + @Override + public void editRegion(final RegionInfo region) throws IOException { + snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region); + LOG.info("take snapshot region={}, table={}", region, snapshotTable); + } + }); + } finally { + exec.shutdown(); + } + status.setStatus("Completed referencing closed/split regions of table: " + snapshotTable); + } + + private void snapshotMobRegion(MasterProcedureEnv env) throws IOException { + if (!MobUtils.hasMobColumns(htd)) { + return; + } + ThreadPoolExecutor exec = SnapshotManifest + .createExecutor(env.getMasterConfiguration(), "MobRegionSnapshotPool"); + RegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(htd.getTableName()); + try { + ModifyRegionUtils.editRegions(exec, Collections.singleton(mobRegionInfo), + new ModifyRegionUtils.RegionEditTask() { + @Override + public void editRegion(final RegionInfo region) throws IOException { + snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region); + } + }); + } finally { + exec.shutdown(); + } + status.setStatus("Completed referencing HFiles for the mob region of table: " + snapshotTable); + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + serializer.serialize(SnapshotProcedureStateData + .newBuilder().setSnapshot(this.snapshot).build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + SnapshotProcedureStateData data = serializer.deserialize(SnapshotProcedureStateData.class); + this.snapshot = data.getSnapshot(); + } + + private Procedure[] createRemoteSnapshotProcedures(MasterProcedureEnv env) { + return env.getAssignmentManager().getTableRegions(snapshotTable, true) + .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)) + .map(r -> new SnapshotRegionProcedure(snapshot, r)) + .toArray(SnapshotRegionProcedure[]::new); + } + + @Override + public void toStringClassDetails(StringBuilder builder) { + builder.append(getClass().getName()) + .append(", id=").append(getProcId()) + .append(", snapshot=").append(ClientSnapshotDescriptionUtils.toString(snapshot)); + } + + public SnapshotDescription getSnapshotDesc() { + return snapshot; + } + + @Override + protected void afterReplay(MasterProcedureEnv env) { + if (getCurrentState() == getInitialState()) { + // if we are in the initial state, it is unnecessary to call prepareSnapshotEnv(). + return; + } + try { + prepareSnapshotEnv(env); + boolean snapshotProcedureEnabled = conf.getBoolean(SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED, + SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED_DEFAULT); + if (!snapshotProcedureEnabled) { + throw new IOException("SnapshotProcedure is DISABLED"); + } + } catch (IOException e) { + LOG.error("Failed replaying {}, mark procedure as FAILED", this, e); + setFailure("master-snapshot", e); + } + } + + public SnapshotDescription getSnapshot() { + return snapshot; + } + + public synchronized void markSnapshotCorrupted() throws IOException { + Path flagFile = SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir); + if (!workingDirFS.exists(flagFile)) { + workingDirFS.create(flagFile).close(); + LOG.info("touch corrupted snapshot flag file {} for {}", flagFile, snapshot.getName()); + } + } + + public boolean isSnapshotCorrupted() throws IOException { + return workingDirFS.exists(SnapshotDescriptionUtils + .getCorruptedFlagFileForSnapshot(workingDir)); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java new file mode 100644 index 000000000000..9e5040601810 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import com.google.errorprone.annotations.RestrictedApi; +import java.io.IOException; +import java.util.Optional; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.assignment.RegionStateNode; +import org.apache.hadoop.hbase.master.assignment.RegionStates; +import org.apache.hadoop.hbase.master.assignment.ServerState; +import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.regionserver.SnapshotRegionCallable; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotRegionProcedureStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; + +/** + * A remote procedure which is used to send region snapshot request to region server. + * The basic logic of SnapshotRegionProcedure is similar like {@link ServerRemoteProcedure}, + * only with a little difference, when {@link FailedRemoteDispatchException} was thrown, + * SnapshotRegionProcedure will sleep some time and continue retrying until success. + */ +@InterfaceAudience.Private +public class SnapshotRegionProcedure extends Procedure + implements TableProcedureInterface, RemoteProcedure { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotRegionProcedure.class); + + private SnapshotDescription snapshot; + private ProcedureEvent event; + private RegionInfo region; + private boolean dispatched; + private boolean succ; + private RetryCounter retryCounter; + + public SnapshotRegionProcedure() { + } + + public SnapshotRegionProcedure(SnapshotDescription snapshot, RegionInfo region) { + this.snapshot = snapshot; + this.region = region; + } + + @Override + protected LockState acquireLock(final MasterProcedureEnv env) { + if (env.getProcedureScheduler().waitRegions(this, getTableName(), region)) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(final MasterProcedureEnv env) { + env.getProcedureScheduler().wakeRegions(this, getTableName(), region); + } + + @Override + protected boolean holdLock(MasterProcedureEnv env) { + return false; + } + + @Override + public Optional remoteCallBuild(MasterProcedureEnv env, ServerName serverName) { + return Optional.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(), + SnapshotRegionCallable.class, MasterProcedureProtos.SnapshotRegionParameter.newBuilder() + .setRegion(ProtobufUtil.toRegionInfo(region)).setSnapshot(snapshot).build().toByteArray())); + } + + @Override + public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, IOException e) { + complete(env, e); + } + + @Override + public void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, null); + } + + @Override + public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException e) { + complete(env, e); + } + + // keep retrying until success + private void complete(MasterProcedureEnv env, Throwable error) { + if (isFinished()) { + LOG.info("This procedure {} is already finished, skip the rest processes", this.getProcId()); + return; + } + if (event == null) { + LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", + getProcId()); + return; + } + if (error == null) { + LOG.info("finish snapshot {} on region {}", snapshot.getName(), region.getEncodedName()); + succ = true; + } + + event.wake(env.getProcedureScheduler()); + event = null; + } + + @Override + public TableName getTableName() { + return region.getTable(); + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.REGION_SNAPSHOT; + } + + @Override + protected Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (succ) { + return null; + } + dispatched = false; + } + + RegionStates regionStates = env.getAssignmentManager().getRegionStates(); + RegionStateNode regionNode = regionStates.getRegionStateNode(region); + regionNode.lock(); + try { + if (regionNode.getProcedure() != null) { + setTimeoutForSuspend(env, String.format("region %s has a TRSP attached %s", + region.getRegionNameAsString(), regionNode.getProcedure())); + throw new ProcedureSuspendedException(); + } + if (!regionNode.isInState(RegionState.State.OPEN)) { + setTimeoutForSuspend(env, String.format("region state of %s is %s", + region.getRegionNameAsString(), regionNode.getState())); + throw new ProcedureSuspendedException(); + } + ServerName targetServer = regionNode.getRegionLocation(); + if (targetServer == null) { + setTimeoutForSuspend(env, String.format("target server of region %s is null", + region.getRegionNameAsString())); + throw new ProcedureSuspendedException(); + } + ServerState serverState = regionStates.getServerNode(targetServer).getState(); + if (serverState != ServerState.ONLINE) { + setTimeoutForSuspend(env, String.format("target server of region %s %s is in state %s", + region.getRegionNameAsString(), targetServer, serverState)); + throw new ProcedureSuspendedException(); + } + try { + env.getRemoteDispatcher().addOperationToNode(targetServer, this); + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } catch (FailedRemoteDispatchException e) { + setTimeoutForSuspend(env, "Failed send request to " + targetServer); + throw new ProcedureSuspendedException(); + } + } finally { + regionNode.unlock(); + } + } + + @Override + protected void rollback(MasterProcedureEnv env) { + throw new UnsupportedOperationException(); + } + + private void setTimeoutForSuspend(MasterProcedureEnv env, String reason) { + if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); + } + long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); + LOG.warn("{} can not run currently because {}, wait {} ms to retry", this, reason, backoff); + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + SnapshotRegionProcedureStateData.Builder builder = + SnapshotRegionProcedureStateData.newBuilder(); + builder.setSnapshot(snapshot); + builder.setRegion(ProtobufUtil.toRegionInfo(region)); + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + SnapshotRegionProcedureStateData data = serializer.deserialize( + SnapshotRegionProcedureStateData.class); + this.snapshot = data.getSnapshot(); + this.region = ProtobufUtil.toRegionInfo(data.getRegion()); + } + + @Override + public String getProcName() { + return getClass().getSimpleName() + " " + region.getEncodedName(); + } + + @Override + protected void toStringClassDetails(StringBuilder builder) { + builder.append(getProcName()); + } + + @Override + protected boolean waitInitialized(MasterProcedureEnv env) { + return env.waitInitialized(this); + } + + public RegionInfo getRegion() { + return region; + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*(/src/test/.*|TestSnapshotProcedure).java") + boolean inRetrying() { + return retryCounter != null; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java new file mode 100644 index 000000000000..6f7ef2ec8477 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.Optional; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.regionserver.SnapshotVerifyCallable; +import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyParameter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyProcedureStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; + +/** + * A remote procedure which is used to send verify snapshot request to region server. + */ +@InterfaceAudience.Private +public class SnapshotVerifyProcedure + extends ServerRemoteProcedure implements TableProcedureInterface { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotVerifyProcedure.class); + + private SnapshotDescription snapshot; + private RegionInfo region; + + private RetryCounter retryCounter; + + public SnapshotVerifyProcedure() {} + + public SnapshotVerifyProcedure(SnapshotDescription snapshot, RegionInfo region) { + this.snapshot = snapshot; + this.region = region; + } + + @Override + protected void rollback(MasterProcedureEnv env) { + // nothing to rollback + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected synchronized void complete(MasterProcedureEnv env, Throwable error) { + try { + if (error != null) { + if (error instanceof RemoteProcedureException) { + // remote operation failed + Throwable remoteEx = unwrapRemoteProcedureException((RemoteProcedureException) error); + if (remoteEx instanceof CorruptedSnapshotException) { + // snapshot is corrupted, will touch a flag file and finish the procedure + succ = true; + SnapshotProcedure parent = env.getMasterServices().getMasterProcedureExecutor() + .getProcedure(SnapshotProcedure.class, getParentProcId()); + if (parent != null) { + parent.markSnapshotCorrupted(); + } + } else { + // unexpected exception in remote server, will retry on other servers + succ = false; + } + } else { + // the mostly like thing is that remote call failed, will retry on other servers + succ = false; + } + } else { + // remote operation finished without error + succ = true; + } + } catch (IOException e) { + // if we can't create the flag file, then mark the current procedure as FAILED + // and rollback the whole snapshot procedure stack. + LOG.warn("Failed create corrupted snapshot flag file for snapshot={}, region={}", + snapshot.getName(), region, e); + setFailure("verify-snapshot", e); + } finally { + // release the worker + env.getMasterServices().getSnapshotManager() + .releaseSnapshotVerifyWorker(this, targetServer, env.getProcedureScheduler()); + } + } + + // we will wrap remote exception into a RemoteProcedureException, + // here we try to unwrap it + private Throwable unwrapRemoteProcedureException(RemoteProcedureException e) { + return e.getCause(); + } + + @Override + protected synchronized Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + try { + // if we've already known the snapshot is corrupted, then stop scheduling + // the new procedures and the undispatched procedures + if (!dispatched) { + SnapshotProcedure parent = env.getMasterServices().getMasterProcedureExecutor() + .getProcedure(SnapshotProcedure.class, getParentProcId()); + if (parent != null && parent.isSnapshotCorrupted()) { + return null; + } + } + // acquire a worker + if (!dispatched && targetServer == null) { + targetServer = env.getMasterServices() + .getSnapshotManager().acquireSnapshotVerifyWorker(this); + } + // send remote request + Procedure[] res = super.execute(env); + // retry if necessary + if (!dispatched) { + // the mostly like thing is that a FailedRemoteDispatchException is thrown. + // we need to retry on another remote server + targetServer = null; + throw new FailedRemoteDispatchException("Failed sent request"); + } else { + // the request was successfully dispatched + return res; + } + } catch (IOException e) { + // there are some cases we need to retry: + // 1. we can't get response from hdfs + // 2. the remote server crashed + if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); + } + long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); + LOG.warn("Failed to get snapshot verify result , wait {} ms to retry", backoff, e); + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); + } + } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + SnapshotVerifyProcedureStateData.Builder builder = + SnapshotVerifyProcedureStateData.newBuilder(); + builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region)); + if (targetServer != null) { + builder.setTargetServer(ProtobufUtil.toServerName(targetServer)); + } + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + SnapshotVerifyProcedureStateData data = + serializer.deserialize(SnapshotVerifyProcedureStateData.class); + this.snapshot = data.getSnapshot(); + this.region = ProtobufUtil.toRegionInfo(data.getRegion()); + if (data.hasTargetServer()) { + this.targetServer = ProtobufUtil.toServerName(data.getTargetServer()); + } + } + + @Override + protected void toStringClassDetails(StringBuilder builder) { + builder.append(getClass().getSimpleName()) + .append(", snapshot=").append(snapshot.getName()); + if (targetServer != null) { + builder.append(", targetServer=").append(targetServer); + } + } + + @Override + public Optional remoteCallBuild(MasterProcedureEnv env, ServerName serverName) { + SnapshotVerifyParameter.Builder builder = SnapshotVerifyParameter.newBuilder(); + builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region)); + return Optional.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(), + SnapshotVerifyCallable.class, builder.build().toByteArray())); + } + + @Override + public TableName getTableName() { + return TableName.valueOf(snapshot.getTable()); + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.SNAPSHOT; + } + + public ServerName getServerName() { + return targetServer; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java index 7e47586ffd92..d7d8d380b1f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java @@ -34,7 +34,7 @@ public interface TableProcedureInterface { public static final TableName DUMMY_NAMESPACE_TABLE_NAME = TableName.NAMESPACE_TABLE_NAME; public enum TableOperationType { - CREATE, DELETE, DISABLE, EDIT, ENABLE, READ, + CREATE, DELETE, DISABLE, EDIT, ENABLE, READ, SNAPSHOT, REGION_SNAPSHOT, REGION_EDIT, REGION_SPLIT, REGION_MERGE, REGION_ASSIGN, REGION_UNASSIGN, REGION_GC, MERGED_REGIONS_GC/* region operations */ } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java index 6fb147e1d6a7..3a53a1fc5da7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java @@ -56,6 +56,7 @@ private static boolean requireTableExclusiveLock(TableProcedureInterface proc) { // we allow concurrent edit on the ns family in meta table return !proc.getTableName().equals(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME); case READ: + case SNAPSHOT: return false; // region operations are using the shared-lock on the table // and then they will grab an xlock on the region. @@ -66,6 +67,7 @@ private static boolean requireTableExclusiveLock(TableProcedureInterface proc) { case REGION_EDIT: case REGION_GC: case MERGED_REGIONS_GC: + case REGION_SNAPSHOT: return false; default: break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java index 90602edc5a09..8d538ee2f6fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; @@ -98,12 +97,10 @@ public MasterSnapshotVerifier(MasterServices services, /** * Verify that the snapshot in the directory is a valid snapshot * @param snapshotDir snapshot directory to check - * @param snapshotServers {@link org.apache.hadoop.hbase.ServerName} of the servers - * that are involved in the snapshot * @throws CorruptedSnapshotException if the snapshot is invalid * @throws IOException if there is an unexpected connection issue to the filesystem */ - public void verifySnapshot(Path snapshotDir, Set snapshotServers) + public void verifySnapshot(Path snapshotDir, boolean verifyRegions) throws CorruptedSnapshotException, IOException { SnapshotManifest manifest = SnapshotManifest.open(services.getConfiguration(), workingDirFs, snapshotDir, snapshot); @@ -114,7 +111,7 @@ public void verifySnapshot(Path snapshotDir, Set snapshotServers) verifyTableInfo(manifest); // check that each region is valid - verifyRegions(manifest); + verifyRegions(manifest, verifyRegions); } /** @@ -154,7 +151,7 @@ private void verifyTableInfo(final SnapshotManifest manifest) throws IOException * @param manifest snapshot manifest to inspect * @throws IOException if we can't reach hbase:meta or read the files from the FS */ - private void verifyRegions(final SnapshotManifest manifest) throws IOException { + private void verifyRegions(SnapshotManifest manifest, boolean verifyRegions) throws IOException { List regions = services.getAssignmentManager().getTableRegions(tableName, false); // Remove the non-default regions RegionReplicaUtil.removeNonDefaultRegions(regions); @@ -182,27 +179,30 @@ private void verifyRegions(final SnapshotManifest manifest) throws IOException { } // Verify RegionInfo - for (RegionInfo region : regions) { - SnapshotRegionManifest regionManifest = regionManifests.get(region.getEncodedName()); - if (regionManifest == null) { - // could happen due to a move or split race. - String mesg = " No snapshot region directory found for region:" + region; - if (errorMsg.isEmpty()) errorMsg = mesg; - LOG.error(mesg); - continue; + if (verifyRegions) { + for (RegionInfo region : regions) { + SnapshotRegionManifest regionManifest = regionManifests.get(region.getEncodedName()); + if (regionManifest == null) { + // could happen due to a move or split race. + String mesg = " No snapshot region directory found for region:" + region; + if (errorMsg.isEmpty()) { + errorMsg = mesg; + } + LOG.error(mesg); + continue; + } + + verifyRegionInfo(region, regionManifest); + } + if (!errorMsg.isEmpty()) { + throw new CorruptedSnapshotException(errorMsg); } - - verifyRegionInfo(region, regionManifest); - } - - if (!errorMsg.isEmpty()) { - throw new CorruptedSnapshotException(errorMsg); - } // Verify Snapshot HFiles // Requires the root directory file system as HFiles are stored in the root directory - SnapshotReferenceUtil.verifySnapshot(services.getConfiguration(), - CommonFSUtils.getRootDirFileSystem(services.getConfiguration()), manifest); + SnapshotReferenceUtil.verifySnapshot(services.getConfiguration(), + CommonFSUtils.getRootDirFileSystem(services.getConfiguration()), manifest); + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java index 14626eb64415..f804dff9998d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -35,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -42,6 +44,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.TableDescriptor; @@ -55,17 +58,24 @@ import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MetricsMaster; import org.apache.hadoop.hbase.master.SnapshotSentinel; +import org.apache.hadoop.hbase.master.WorkerAssigner; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner; import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure; +import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure; +import org.apache.hadoop.hbase.master.procedure.SnapshotVerifyProcedure; import org.apache.hadoop.hbase.procedure.MasterProcedureManager; import org.apache.hadoop.hbase.procedure.Procedure; import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.User; @@ -155,6 +165,11 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable public static final String SNAPSHOT_MAX_FILE_SIZE_PRESERVE = "hbase.snapshot.max.filesize.preserve"; + /** Enable or disable snapshot procedure */ + public static final String SNAPSHOT_PROCEDURE_ENABLED = "hbase.snapshot.procedure.enabled"; + + public static final boolean SNAPSHOT_PROCEDURE_ENABLED_DEFAULT = true; + private boolean stopped; private MasterServices master; // Needed by TableEventHandlers private ProcedureCoordinator coordinator; @@ -181,6 +196,12 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable // snapshot using Procedure-V2. private Map restoreTableToProcIdMap = new HashMap<>(); + // SnapshotDescription -> SnapshotProcId + private final ConcurrentHashMap + snapshotToProcIdMap = new ConcurrentHashMap<>(); + + private WorkerAssigner verifyWorkerAssigner; + private Path rootDir; private ExecutorService executorService; @@ -292,18 +313,41 @@ private List getCompletedSnapshots(Path snapshotDir, boolea } /** - * Cleans up any snapshots in the snapshot/.tmp directory that were left from failed - * snapshot attempts. + * Cleans up any zk-coordinated snapshots in the snapshot/.tmp directory that were left from + * failed snapshot attempts. For unfinished procedure2-coordinated snapshots, keep the working + * directory. * * @throws IOException if we can't reach the filesystem */ private void resetTempDir() throws IOException { - // cleanup any existing snapshots. + Set workingProcedureCoordinatedSnapshotNames = + snapshotToProcIdMap.keySet().stream().map(s -> s.getName()).collect(Collectors.toSet()); + Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, - master.getConfiguration()); + master.getConfiguration()); FileSystem tmpFs = tmpdir.getFileSystem(master.getConfiguration()); - if (!tmpFs.delete(tmpdir, true)) { - LOG.warn("Couldn't delete working snapshot directory: " + tmpdir); + FileStatus[] workingSnapshotDirs = CommonFSUtils.listStatus(tmpFs, tmpdir); + if (workingSnapshotDirs == null) { + return; + } + for (FileStatus workingSnapshotDir : workingSnapshotDirs) { + String workingSnapshotName = workingSnapshotDir.getPath().getName(); + if (!workingProcedureCoordinatedSnapshotNames.contains(workingSnapshotName)) { + try { + if (tmpFs.delete(workingSnapshotDir.getPath(), true)) { + LOG.info("delete unfinished zk-coordinated snapshot working directory {}", + workingSnapshotDir.getPath()); + } else { + LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}", + workingSnapshotDir.getPath()); + } + } catch (IOException e) { + LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}", + workingSnapshotDir.getPath(), e); + } + } else { + LOG.debug("find working directory of unfinished procedure {}", workingSnapshotName); + } } } @@ -363,6 +407,15 @@ public boolean isSnapshotDone(SnapshotDescription expected) throws IOException { "No snapshot name passed in request, can't figure out which snapshot you want to check."); } + Long procId = snapshotToProcIdMap.get(expected); + if (procId != null) { + if (master.getMasterProcedureExecutor().isRunning()) { + return master.getMasterProcedureExecutor().isFinished(procId); + } else { + return false; + } + } + String ssString = ClientSnapshotDescriptionUtils.toString(expected); // check to see if the sentinel exists, @@ -418,15 +471,18 @@ public boolean isSnapshotDone(SnapshotDescription expected) throws IOException { * Currently we have a limitation only allowing a single snapshot per table at a time. Also we * don't allow snapshot with the same name. * @param snapshot description of the snapshot being checked. + * @param checkTable check if the table is already taking a snapshot. * @return true if there is a snapshot in progress with the same name or on the same * table. */ - synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot) { - TableName snapshotTable = TableName.valueOf(snapshot.getTable()); - if (isTakingSnapshot(snapshotTable)) { - return true; + synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot, boolean checkTable) { + if (checkTable) { + TableName snapshotTable = TableName.valueOf(snapshot.getTable()); + if (isTakingSnapshot(snapshotTable)) { + return true; + } } - Iterator> it = this.snapshotHandlers.entrySet().iterator(); + Iterator> it = snapshotHandlers.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = it.next(); SnapshotSentinel sentinel = entry.getValue(); @@ -434,6 +490,14 @@ synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot) { return true; } } + Iterator> spIt = snapshotToProcIdMap.entrySet().iterator(); + while (spIt.hasNext()) { + Map.Entry entry = spIt.next(); + if (snapshot.getName().equals(entry.getKey().getName()) + && !master.getMasterProcedureExecutor().isFinished(entry.getValue())) { + return true; + } + } return false; } @@ -444,8 +508,39 @@ synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot) { * @return true if there is a snapshot in progress on the specified table. */ public boolean isTakingSnapshot(final TableName tableName) { + return isTakingSnapshot(tableName, false); + } + + public boolean isTableTakingAnySnapshot(final TableName tableName) { + return isTakingSnapshot(tableName, true); + } + + /** + * Check to see if the specified table has a snapshot in progress. Since we introduce the + * SnapshotProcedure, it is a little bit different from before. For zk-coordinated + * snapshot, we can just consider tables in snapshotHandlers only, but for + * {@link org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure} and + * {@link org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure}, we need + * to consider tables in snapshotToProcIdMap also, for the snapshot procedure, we don't + * need to check if table in snapshot. + * @param tableName name of the table being snapshotted. + * @param checkProcedure true if we should check tables in snapshotToProcIdMap + * @return true if there is a snapshot in progress on the specified table. + */ + private synchronized boolean isTakingSnapshot(TableName tableName, boolean checkProcedure) { SnapshotSentinel handler = this.snapshotHandlers.get(tableName); - return handler != null && !handler.isFinished(); + if (handler != null && !handler.isFinished()) { + return true; + } + if (checkProcedure) { + for (Map.Entry entry : snapshotToProcIdMap.entrySet()) { + if (TableName.valueOf(entry.getKey().getTable()).equals(tableName) + && !master.getMasterProcedureExecutor().isFinished(entry.getValue())) { + return true; + } + } + } + return false; } /** @@ -454,30 +549,10 @@ public boolean isTakingSnapshot(final TableName tableName) { * @param snapshot description of the snapshot we want to start * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot */ - private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot) + public synchronized void prepareWorkingDirectory(SnapshotDescription snapshot) throws HBaseSnapshotException { Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, master.getConfiguration()); - TableName snapshotTable = - TableName.valueOf(snapshot.getTable()); - - // make sure we aren't already running a snapshot - if (isTakingSnapshot(snapshot)) { - SnapshotSentinel handler = this.snapshotHandlers.get(snapshotTable); - throw new SnapshotCreationException("Rejected taking " - + ClientSnapshotDescriptionUtils.toString(snapshot) - + " because we are already running another snapshot " - + (handler != null ? ("on the same table " + - ClientSnapshotDescriptionUtils.toString(handler.getSnapshot())) - : "with the same name"), ProtobufUtil.createSnapshotDesc(snapshot)); - } - - // make sure we aren't running a restore on the same table - if (isRestoringTable(snapshotTable)) { - throw new SnapshotCreationException("Rejected taking " - + ClientSnapshotDescriptionUtils.toString(snapshot) - + " because we are already have a restore in progress on the same snapshot."); - } try { FileSystem workingDirFS = workingDir.getFileSystem(master.getConfiguration()); @@ -508,7 +583,7 @@ private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot) private synchronized void snapshotDisabledTable(SnapshotDescription snapshot) throws IOException { // setup the snapshot - prepareToTakeSnapshot(snapshot); + prepareWorkingDirectory(snapshot); // set the snapshot to be a disabled snapshot, since the client doesn't know about that snapshot = snapshot.toBuilder().setType(Type.DISABLED).build(); @@ -528,7 +603,7 @@ private synchronized void snapshotDisabledTable(SnapshotDescription snapshot) private synchronized void snapshotEnabledTable(SnapshotDescription snapshot) throws IOException { // setup the snapshot - prepareToTakeSnapshot(snapshot); + prepareWorkingDirectory(snapshot); // Take the snapshot of the enabled table EnabledTableSnapshotHandler handler = @@ -583,7 +658,9 @@ public ReadWriteLock getTakingSnapshotLock() { * @return true to indicate that there're some running snapshots. */ public synchronized boolean isTakingAnySnapshot() { - return this.takingSnapshotLock.getReadHoldCount() > 0 || this.snapshotHandlers.size() > 0; + return this.takingSnapshotLock.getReadHoldCount() > 0 + || this.snapshotHandlers.size() > 0 + || this.snapshotToProcIdMap.size() > 0; } /** @@ -601,56 +678,46 @@ public void takeSnapshot(SnapshotDescription snapshot) throws IOException { } } - private void takeSnapshotInternal(SnapshotDescription snapshot) throws IOException { - // check to see if we already completed the snapshot - if (isSnapshotCompleted(snapshot)) { - throw new SnapshotExistsException( - "Snapshot '" + snapshot.getName() + "' already stored on the filesystem.", - ProtobufUtil.createSnapshotDesc(snapshot)); + public synchronized long takeSnapshot(SnapshotDescription snapshot, + long nonceGroup, long nonce) throws IOException { + this.takingSnapshotLock.readLock().lock(); + try { + return submitSnapshotProcedure(snapshot, nonceGroup, nonce); + } finally { + this.takingSnapshotLock.readLock().unlock(); } + } - LOG.debug("No existing snapshot, attempting snapshot..."); + private long submitSnapshotProcedure(SnapshotDescription snapshot, + long nonceGroup, long nonce) throws IOException { + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(master, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + sanityCheckBeforeSnapshot(snapshot, false); - // stop tracking "abandoned" handlers - cleanupSentinels(); + long procId = submitProcedure(new SnapshotProcedure( + getMaster().getMasterProcedureExecutor().getEnvironment(), snapshot)); - // check to see if the table exists - TableDescriptor desc = null; - try { - desc = master.getTableDescriptors().get( - TableName.valueOf(snapshot.getTable())); - } catch (FileNotFoundException e) { - String msg = "Table:" + snapshot.getTable() + " info doesn't exist!"; - LOG.error(msg); - throw new SnapshotCreationException(msg, e, ProtobufUtil.createSnapshotDesc(snapshot)); - } catch (IOException e) { - throw new SnapshotCreationException( - "Error while geting table description for table " + snapshot.getTable(), e, - ProtobufUtil.createSnapshotDesc(snapshot)); - } - if (desc == null) { - throw new SnapshotCreationException( - "Table '" + snapshot.getTable() + "' doesn't exist, can't take snapshot.", - ProtobufUtil.createSnapshotDesc(snapshot)); - } - SnapshotDescription.Builder builder = snapshot.toBuilder(); - // if not specified, set the snapshot format - if (!snapshot.hasVersion()) { - builder.setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION); - } - RpcServer.getRequestUser().ifPresent(user -> { - if (AccessChecker.isAuthorizationSupported(master.getConfiguration())) { - builder.setOwner(user.getShortName()); - } - }); - snapshot = builder.build(); + getMaster().getSnapshotManager().registerSnapshotProcedure(snapshot, procId); + } + + @Override + protected String getDescription() { + return "SnapshotProcedure"; + } + }); + } + + private void takeSnapshotInternal(SnapshotDescription snapshot) throws IOException { + TableDescriptor desc = sanityCheckBeforeSnapshot(snapshot, true); // call pre coproc hook MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null; if (cpHost != null) { snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot); - cpHost.preSnapshot(snapshotPOJO, desc); + cpHost.preSnapshot(snapshotPOJO, desc, RpcServer.getRequestUser().orElse(null)); } // if the table is enabled, then have the RS run actually the snapshot work @@ -688,8 +755,68 @@ else if (master.getTableStateManager().isTableState(snapshotTable, // call post coproc hook if (cpHost != null) { - cpHost.postSnapshot(snapshotPOJO, desc); + cpHost.postSnapshot(snapshotPOJO, desc, RpcServer.getRequestUser().orElse(null)); + } + } + + /** + * Check if the snapshot can be taken. Currently we have some limitations, for zk-coordinated + * snapshot, we don't allow snapshot with same name or taking multiple snapshots of a table at + * the same time, for procedure-coordinated snapshot, we don't allow snapshot with same name. + * @param snapshot description of the snapshot being checked. + * @param checkTable check if the table is already taking a snapshot. For zk-coordinated + * snapshot, we need to check if another zk-coordinated snapshot is in + * progress, for the snapshot procedure, this is unnecessary. + * @return the table descriptor of the table + */ + private synchronized TableDescriptor sanityCheckBeforeSnapshot( + SnapshotDescription snapshot, boolean checkTable) throws IOException { + // check to see if we already completed the snapshot + if (isSnapshotCompleted(snapshot)) { + throw new SnapshotExistsException("Snapshot '" + snapshot.getName() + + "' already stored on the filesystem.", ProtobufUtil.createSnapshotDesc(snapshot)); + } + LOG.debug("No existing snapshot, attempting snapshot..."); + + // stop tracking "abandoned" handlers + cleanupSentinels(); + + TableName snapshotTable = + TableName.valueOf(snapshot.getTable()); + // make sure we aren't already running a snapshot + if (isTakingSnapshot(snapshot, checkTable)) { + throw new SnapshotCreationException("Rejected taking " + + ClientSnapshotDescriptionUtils.toString(snapshot) + + " because we are already running another snapshot" + + " on the same table or with the same name"); + } + + // make sure we aren't running a restore on the same table + if (isRestoringTable(snapshotTable)) { + throw new SnapshotCreationException("Rejected taking " + + ClientSnapshotDescriptionUtils.toString(snapshot) + + " because we are already have a restore in progress on the same snapshot."); + } + + // check to see if the table exists + TableDescriptor desc = null; + try { + desc = master.getTableDescriptors().get(TableName.valueOf(snapshot.getTable())); + } catch (FileNotFoundException e) { + String msg = "Table:" + snapshot.getTable() + " info doesn't exist!"; + LOG.error(msg); + throw new SnapshotCreationException(msg, e, ProtobufUtil.createSnapshotDesc(snapshot)); + } catch (IOException e) { + throw new SnapshotCreationException( + "Error while geting table description for table " + snapshot.getTable(), e, + ProtobufUtil.createSnapshotDesc(snapshot)); + } + if (desc == null) { + throw new SnapshotCreationException( + "Table '" + snapshot.getTable() + "' doesn't exist, can't take snapshot.", + ProtobufUtil.createSnapshotDesc(snapshot)); } + return desc; } /** @@ -793,7 +920,7 @@ synchronized long cloneSnapshot(final SnapshotDescription snapshot, TableName tableName = tableDescriptor.getTableName(); // make sure we aren't running a snapshot on the same table - if (isTakingSnapshot(tableName)) { + if (isTableTakingAnySnapshot(tableName)) { throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName); } @@ -933,7 +1060,7 @@ private synchronized long restoreSnapshot(final SnapshotDescription snapshot, final TableName tableName = tableDescriptor.getTableName(); // make sure we aren't running a snapshot on the same table - if (isTakingSnapshot(tableName)) { + if (isTableTakingAnySnapshot(tableName)) { throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName); } @@ -1021,6 +1148,7 @@ private synchronized SnapshotSentinel removeSentinelIfFinished( private void cleanupSentinels() { cleanupSentinels(this.snapshotHandlers); cleanupCompletedRestoreInMap(); + cleanupCompletedSnapshotInMap(); } /** @@ -1059,6 +1187,22 @@ private synchronized void cleanupCompletedRestoreInMap() { } } + /** + * Remove the procedures that are marked as finished + */ + private synchronized void cleanupCompletedSnapshotInMap() { + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + Iterator> it = snapshotToProcIdMap.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + Long procId = entry.getValue(); + if (procExec.isRunning() && procExec.isFinished(procId)) { + it.remove(); + } + } + } + + // // Implementing Stoppable interface // @@ -1214,11 +1358,27 @@ public void initialize(MasterServices master, MetricsMaster metricsMaster) throw this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); this.executorService = master.getExecutorService(); + this.verifyWorkerAssigner = new WorkerAssigner(master, + conf.getInt("hbase.snapshot.verify.task.max", 3), + new ProcedureEvent<>("snapshot-verify-worker-assigning")); + restoreUnfinishedSnapshotProcedure(); + restoreWorkers(); resetTempDir(); snapshotHandlerChoreCleanerTask = scheduleThreadPool.scheduleAtFixedRate(this::cleanupSentinels, 10, 10, TimeUnit.SECONDS); } + private void restoreUnfinishedSnapshotProcedure() { + master.getMasterProcedureExecutor() + .getActiveProceduresNoCopy() + .stream().filter(p -> p instanceof SnapshotProcedure) + .filter(p -> !p.isFinished()).map(p -> (SnapshotProcedure) p) + .forEach(p -> { + registerSnapshotProcedure(p.getSnapshot(), p.getProcId()); + LOG.info("restore unfinished snapshot procedure {}", p); + }); + } + @Override public String getProcedureSignature() { return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION; @@ -1264,4 +1424,55 @@ private SnapshotDescription toSnapshotDescription(ProcedureDescription desc) builder.setType(SnapshotDescription.Type.FLUSH); return builder.build(); } + + public void registerSnapshotProcedure(SnapshotDescription snapshot, long procId) { + snapshotToProcIdMap.put(snapshot, procId); + LOG.debug("register snapshot={}, snapshot procedure id = {}", + ClientSnapshotDescriptionUtils.toString(snapshot), procId); + } + + public void unregisterSnapshotProcedure(SnapshotDescription snapshot, long procId) { + snapshotToProcIdMap.remove(snapshot, procId); + LOG.debug("unregister snapshot={}, snapshot procedure id = {}", + ClientSnapshotDescriptionUtils.toString(snapshot), procId); + } + + public boolean snapshotProcedureEnabled() { + return master.getConfiguration() + .getBoolean(SNAPSHOT_PROCEDURE_ENABLED, SNAPSHOT_PROCEDURE_ENABLED_DEFAULT); + } + + public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure procedure) + throws ProcedureSuspendedException { + Optional worker = verifyWorkerAssigner.acquire(); + if (worker.isPresent()) { + LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker.get()); + return worker.get(); + } + verifyWorkerAssigner.suspend(procedure); + throw new ProcedureSuspendedException(); + } + + public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, + ServerName worker, MasterProcedureScheduler scheduler) { + LOG.debug("{} Release verify snapshot worker={}", procedure, worker); + verifyWorkerAssigner.release(worker); + verifyWorkerAssigner.wake(scheduler); + } + + private void restoreWorkers() { + master.getMasterProcedureExecutor().getActiveProceduresNoCopy().stream() + .filter(p -> p instanceof SnapshotVerifyProcedure) + .map(p -> (SnapshotVerifyProcedure) p) + .filter(p -> !p.isFinished()) + .filter(p -> p.getServerName() != null) + .forEach(p -> { + verifyWorkerAssigner.addUsedWorker(p.getServerName()); + LOG.debug("{} restores used worker {}", p, p.getServerName()); + }); + } + + public Integer getAvailableWorker(ServerName serverName) { + return verifyWorkerAssigner.getAvailableWorker(serverName); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java index a6dd3f8e7bb7..95579846d5d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java @@ -18,9 +18,7 @@ package org.apache.hadoop.hbase.master.snapshot; import java.io.IOException; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.CancellationException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -201,23 +199,13 @@ public void process() { snapshotRegions(regionsAndLocations); monitor.rethrowException(); - // extract each pair to separate lists - Set serverNames = new HashSet<>(); - for (Pair p : regionsAndLocations) { - if (p != null && p.getFirst() != null && p.getSecond() != null) { - RegionInfo hri = p.getFirst(); - if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) continue; - serverNames.add(p.getSecond().toString()); - } - } - // flush the in-memory state, and write the single manifest status.setStatus("Consolidate snapshot: " + snapshot.getName()); snapshotManifest.consolidate(); // verify the snapshot is valid status.setStatus("Verifying snapshot: " + snapshot.getName()); - verifier.verifySnapshot(this.workingDir, serverNames); + verifier.verifySnapshot(workingDir, true); // complete the snapshot, atomically moving from tmp to .snapshot dir. SnapshotDescriptionUtils.completeSnapshot(this.snapshotDir, this.workingDir, this.rootFs, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 8b64f25c6687..ea9acca7d352 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -323,6 +323,8 @@ public class HRegionServer extends HBaseServerBase private JvmPauseMonitor pauseMonitor; + private RSSnapshotVerifier rsSnapshotVerifier; + /** region server process name */ public static final String REGIONSERVER = "regionserver"; @@ -501,6 +503,8 @@ public HRegionServer(final Configuration conf) throws IOException { blockCache = BlockCacheFactory.createBlockCache(conf); mobFileCache = new MobFileCache(conf); + rsSnapshotVerifier = new RSSnapshotVerifier(conf); + uncaughtExceptionHandler = (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e); @@ -1838,6 +1842,10 @@ private void startServices() throws IOException { conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1); executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( ExecutorType.RS_CLAIM_REPLICATION_QUEUE).setCorePoolSize(claimReplicationQueueThreads)); + final int rsSnapshotOperationThreads = + conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_SNAPSHOT_OPERATIONS).setCorePoolSize(rsSnapshotOperationThreads)); Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", uncaughtExceptionHandler); @@ -3549,6 +3557,10 @@ public BrokenStoreFileCleaner getBrokenStoreFileCleaner(){ return brokenStoreFileCleaner; } + RSSnapshotVerifier getRsSnapshotVerifier() { + return rsSnapshotVerifier; + } + @Override protected void stopChores() { shutdownChore(nonceManagerChore); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSSnapshotVerifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSSnapshotVerifier.java new file mode 100644 index 000000000000..2b38cba0c9b6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSSnapshotVerifier.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; +import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil.StoreFileVisitor; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; +import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.StoreFile; + +/** + * Used by {@link org.apache.hadoop.hbase.master.procedure.SnapshotVerifyProcedure} to verify + * if the region info and store file info in RegionManifest are intact. + */ +@InterfaceAudience.Private +public class RSSnapshotVerifier { + private static final Logger LOG = LoggerFactory.getLogger(RSSnapshotVerifier.class); + + private final LoadingCache>> SNAPSHOT_MANIFEST_CACHE; + private final Configuration conf; + + public RSSnapshotVerifier(Configuration conf) { + this.conf = conf; + long expiredTime = conf.getLong("hbase.snapshot-manifest.cache.expired.sec", 600); + long maxSize = conf.getLong("hbase.snapshot-manifest.cache.max.size", 10); + this.SNAPSHOT_MANIFEST_CACHE = CacheBuilder.newBuilder() + .expireAfterAccess(expiredTime, TimeUnit.SECONDS).maximumSize(maxSize) + .build(new SnapshotManifestCacheLoader(conf)); + } + + public void verifyRegion(SnapshotDescription snapshot, RegionInfo region) throws IOException { + try { + Pair> + cache = SNAPSHOT_MANIFEST_CACHE.get(snapshot); + Map rmMap = cache.getSecond(); + if (rmMap == null) { + throw new CorruptedSnapshotException(snapshot.getName() + "looks empty"); + } + SnapshotRegionManifest regionManifest = rmMap.get(region.getEncodedName()); + if (regionManifest == null) { + LOG.warn("No snapshot region directory found for {}", region.getRegionNameAsString()); + return; + } + // verify region info + RegionInfo manifestRegionInfo = ProtobufUtil.toRegionInfo(regionManifest.getRegionInfo()); + if (RegionInfo.COMPARATOR.compare(region, manifestRegionInfo) != 0) { + String msg = + "Manifest region info " + manifestRegionInfo + "doesn't match expected region:" + region; + throw new CorruptedSnapshotException(msg, ProtobufUtil.createSnapshotDesc(snapshot)); + } + // verify store file + SnapshotReferenceUtil.visitRegionStoreFiles(regionManifest, new StoreFileVisitor() { + @Override public void storeFile(RegionInfo region, String familyName, StoreFile storeFile) + throws IOException { + SnapshotReferenceUtil.verifyStoreFile(conf, cache.getFirst(), + /* snapshotDir= */ null, // snapshotDir is never used, so it's ok to pass null here. + // maybe we can remove this parameter later. + snapshot, region, familyName, storeFile); + } + }); + } catch (ExecutionException e) { + if (e.getCause() instanceof CorruptedSnapshotException) { + throw new CorruptedSnapshotException(e.getCause().getMessage(), + ProtobufUtil.createSnapshotDesc(snapshot)); + } else { + LOG.error("Failed loading snapshot manifest for {} from filesystem", + snapshot.getName(), e.getCause()); + throw new IOException(e.getCause()); + } + } + } + + // to avoid loading snapshot manifest from filesystem for each region, try to cache it here + private static final class SnapshotManifestCacheLoader extends + CacheLoader>> { + private final Configuration conf; + + private SnapshotManifestCacheLoader(Configuration conf) { + this.conf = conf; + } + + @Override + public Pair> + load(SnapshotDescription snapshot) throws Exception { + Path rootDir = CommonFSUtils.getRootDir(conf); + Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, conf); + FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf); + FileSystem workingDirFS = workingDir.getFileSystem(conf); + SnapshotManifest manifest = SnapshotManifest.open(conf, workingDirFS, workingDir, snapshot); + LOG.debug("loading snapshot manifest for {} from {}", snapshot.getName(), workingDir); + return Pair.newPair(rootFs, manifest.getRegionManifestsMap()); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java new file mode 100644 index 000000000000..d978bb7af528 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotRegionParameter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; + +@InterfaceAudience.Private +public class SnapshotRegionCallable extends BaseRSProcedureCallable { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotRegionCallable.class); + + private SnapshotDescription snapshot; + private RegionInfo regionInfo; + private ForeignExceptionDispatcher monitor; + + @Override + protected void doCall() throws Exception { + HRegion region = rs.getRegion(regionInfo.getEncodedName()); + if (region == null) { + throw new NotServingRegionException( + "snapshot=" + snapshot.getName() + ", region=" + regionInfo.getRegionNameAsString()); + } + LOG.debug("Starting snapshot operation on {}", region); + region.startRegionOperation(Region.Operation.SNAPSHOT); + try { + if (snapshot.getType() == SnapshotDescription.Type.FLUSH) { + boolean succeeded = false; + long readPt = region.getReadPoint(IsolationLevel.READ_COMMITTED); + int retryTimes = rs.getConfiguration().getInt("hbase.snapshot.flush.retryTimes", 3); + for (int i = 0; i < retryTimes; i++) { + HRegion.FlushResult res = region.flush(true); + if (res.getResult() == HRegion.FlushResult.Result.CANNOT_FLUSH) { + region.waitForFlushes(); + if (region.getMaxFlushedSeqId() >= readPt) { + succeeded = true; + break; + } + } else { + succeeded = true; + break; + } + } + if (!succeeded) { + throw new IOException( + "Unable to complete flush " + regionInfo.getRegionNameAsString() + + " after " + retryTimes + " attempts"); + } + } + LOG.debug("Snapshotting region {} for {} completed.", region, snapshot.getName()); + region.addRegionToSnapshot(snapshot, monitor); + } finally { + LOG.debug("Closing snapshot operation on {}", region); + region.closeRegionOperation(Region.Operation.SNAPSHOT); + } + } + + @Override + protected void initParameter(byte[] parameter) throws Exception { + SnapshotRegionParameter param = SnapshotRegionParameter.parseFrom(parameter); + this.snapshot = param.getSnapshot(); + this.regionInfo = ProtobufUtil.toRegionInfo(param.getRegion()); + this.monitor = new ForeignExceptionDispatcher(snapshot.getName()); + } + + @Override + public EventType getEventType() { + return EventType.RS_SNAPSHOT_REGIONS; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java new file mode 100644 index 000000000000..203d9dbd68e5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyParameter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; + +@InterfaceAudience.Private +public class SnapshotVerifyCallable extends BaseRSProcedureCallable { + private SnapshotDescription snapshot; + private RegionInfo region; + + @Override + protected void doCall() throws Exception { + rs.getRsSnapshotVerifier().verifyRegion(snapshot, region); + } + + @Override + protected void initParameter(byte[] parameter) throws Exception { + SnapshotVerifyParameter param = SnapshotVerifyParameter.parseFrom(parameter); + this.snapshot = param.getSnapshot(); + this.region = ProtobufUtil.toRegionInfo(param.getRegion()); + } + + @Override + public EventType getEventType() { + return EventType.RS_VERIFY_SNAPSHOT; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java index c059792ca68e..d126ec5a7526 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java @@ -34,7 +34,9 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.PermissionStorage; import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil; import org.apache.hadoop.hbase.security.access.UserPermission; @@ -136,6 +138,8 @@ public CompletedSnaphotDirectoriesFilter(FileSystem fs) { /** By default, wait 300 seconds for a snapshot to complete */ public static final long DEFAULT_MAX_WAIT_TIME = 60000 * 5 ; + public static final String SNAPSHOT_CORRUPTED_FILE = "_CORRUPTED"; + private SnapshotDescriptionUtils() { // private constructor for utility class } @@ -226,6 +230,14 @@ public static Path getWorkingSnapshotDir(String snapshotName, final Path rootDir return getSpecifiedSnapshotDir(getWorkingSnapshotDir(rootDir, conf), snapshotName); } + /** + * Get the flag file path if the snapshot is corrupted + * @param workingDir the directory where we build the specific snapshot + * @return {@link Path} snapshot corrupted flag file path + */ + public static Path getCorruptedFlagFileForSnapshot(final Path workingDir) { + return new Path(workingDir, SNAPSHOT_CORRUPTED_FILE); + } /** * Get the directory within the given filepath to store the snapshot instance * @param snapshotsDir directory to store snapshot directory within @@ -296,15 +308,15 @@ public static SnapshotDescription validate(SnapshotDescription snapshot, Configu "Descriptor doesn't apply to a table, so we can't build it."); } + SnapshotDescription.Builder builder = snapshot.toBuilder(); + // set the creation time, if one hasn't been set long time = snapshot.getCreationTime(); if (time == SnapshotDescriptionUtils.NO_SNAPSHOT_START_TIME_SPECIFIED) { time = EnvironmentEdgeManager.currentTime(); LOG.debug("Creation time not specified, setting to:" + time + " (current time:" + EnvironmentEdgeManager.currentTime() + ")."); - SnapshotDescription.Builder builder = snapshot.toBuilder(); builder.setCreationTime(time); - snapshot = builder.build(); } long ttl = snapshot.getTtl(); @@ -319,8 +331,21 @@ public static SnapshotDescription validate(SnapshotDescription snapshot, Configu } ttl = defaultSnapshotTtl; } - SnapshotDescription.Builder builder = snapshot.toBuilder(); builder.setTtl(ttl); + + if (!snapshot.hasVersion()) { + builder.setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION); + LOG.debug("Snapshot {} VERSION not specified, setting to {}", snapshot.getName(), + SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION); + } + + RpcServer.getRequestUser().ifPresent(user -> { + if (AccessChecker.isAuthorizationSupported(conf)) { + builder.setOwner(user.getShortName()); + LOG.debug("Set {} as owner of Snapshot", user.getShortName()); + } + }); + snapshot = builder.build(); // set the acl to snapshot if security feature is enabled. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java index b6d3c4893660..2e8c0dfdff96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java @@ -129,7 +129,7 @@ static void visitTableStoreFiles(final Configuration conf, final FileSystem fs, * @param visitor callback object to get the store files * @throws IOException if an error occurred while scanning the directory */ - static void visitRegionStoreFiles(final SnapshotRegionManifest manifest, + public static void visitRegionStoreFiles(final SnapshotRegionManifest manifest, final StoreFileVisitor visitor) throws IOException { RegionInfo regionInfo = ProtobufUtil.toRegionInfo(manifest.getRegionInfo()); for (SnapshotRegionManifest.FamilyFiles familyFiles: manifest.getFamilyFilesList()) { @@ -178,6 +178,16 @@ public void storeFile(final RegionInfo regionInfo, final String family, }); } + /** + * Verify the validity of the snapshot. + * + * @param visitor user-specified store file visitor + */ + public static void verifySnapshot(final Configuration conf, final FileSystem fs, + final SnapshotManifest manifest, final StoreFileVisitor visitor) throws IOException { + concurrentVisitReferencedFiles(conf, fs, manifest, "VerifySnapshot", visitor); + } + public static void concurrentVisitReferencedFiles(final Configuration conf, final FileSystem fs, final SnapshotManifest manifest, final String desc, final StoreFileVisitor visitor) throws IOException { @@ -249,7 +259,7 @@ public static void concurrentVisitReferencedFiles(final Configuration conf, fina * @throws CorruptedSnapshotException if the snapshot is corrupted * @throws IOException if an error occurred while scanning the directory */ - private static void verifyStoreFile(final Configuration conf, final FileSystem fs, + public static void verifyStoreFile(final Configuration conf, final FileSystem fs, final Path snapshotDir, final SnapshotDescription snapshot, final RegionInfo regionInfo, final String family, final SnapshotRegionManifest.StoreFile storeFile) throws IOException { TableName table = TableName.valueOf(snapshot.getTable()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedure.java new file mode 100644 index 000000000000..ffbb54bbcd72 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotProcedure.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.SnapshotDescription; +import org.apache.hadoop.hbase.client.SnapshotType; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.RegionSplitter; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.mockito.internal.stubbing.answers.AnswersWithDelay; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestSnapshotProcedure { + private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotProcedure.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSnapshotProcedure.class); + + private static HBaseTestingUtil TEST_UTIL; + private HMaster master; + private TableName TABLE_NAME; + private byte[] CF; + private String SNAPSHOT_NAME; + private SnapshotDescription snapshot; + private SnapshotProtos.SnapshotDescription snapshotProto; + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtil(); + Configuration config = TEST_UTIL.getConfiguration(); + // using SnapshotVerifyProcedure to verify snapshot + config.setInt("hbase.snapshot.remote.verify.threshold", 1); + // disable info server. Info server is useful when we run unit tests locally, but it will + // fails integration testing of jenkins. + // config.setInt(HConstants.MASTER_INFO_PORT, 8080); + + // delay dispatch so that we can do something, for example kill a target server + config.setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 10000); + config.setInt(RemoteProcedureDispatcher.DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, 128); + TEST_UTIL.startMiniCluster(3); + master = TEST_UTIL.getHBaseCluster().getMaster(); + TABLE_NAME = TableName.valueOf(Bytes.toBytes("SPTestTable")); + CF = Bytes.toBytes("cf"); + SNAPSHOT_NAME = "SnapshotProcedureTest"; + snapshot = new SnapshotDescription(SNAPSHOT_NAME, TABLE_NAME, SnapshotType.FLUSH); + snapshotProto = ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot); + snapshotProto = SnapshotDescriptionUtils.validate(snapshotProto, master.getConfiguration()); + final byte[][] splitKeys = new RegionSplitter.HexStringSplit().split(10); + Table table = TEST_UTIL.createTable(TABLE_NAME, CF, splitKeys); + TEST_UTIL.loadTable(table, CF, false); + } + + @Test + public void testSimpleSnapshotTable() throws Exception { + TEST_UTIL.getAdmin().snapshot(snapshot); + SnapshotTestingUtils.assertOneSnapshotThatMatches(TEST_UTIL.getAdmin(), snapshotProto); + SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); + } + + @Test + public void testMasterRestart() throws Exception { + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + MasterProcedureEnv env = procExec.getEnvironment(); + SnapshotProcedure sp = new SnapshotProcedure(env, snapshotProto); + SnapshotProcedure spySp = getDelayedOnSpecificStateSnapshotProcedure(sp, + procExec.getEnvironment(), SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS); + + long procId = procExec.submitProcedure(spySp); + + TEST_UTIL.waitFor(2000, () -> env.getMasterServices().getProcedures() + .stream().map(Procedure::getProcId).collect(Collectors.toList()).contains(procId)); + TEST_UTIL.getHBaseCluster().killMaster(master.getServerName()); + TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 30000); + TEST_UTIL.getHBaseCluster().startMaster(); + TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); + + master = TEST_UTIL.getHBaseCluster().getMaster(); + assertTrue(master.getSnapshotManager().isTakingAnySnapshot()); + assertTrue(master.getSnapshotManager().isTableTakingAnySnapshot(TABLE_NAME)); + + List unfinishedProcedures = master + .getMasterProcedureExecutor().getProcedures().stream() + .filter(p -> p instanceof SnapshotProcedure) + .filter(p -> !p.isFinished()).map(p -> (SnapshotProcedure) p) + .collect(Collectors.toList()); + assertEquals(unfinishedProcedures.size(), 1); + long newProcId = unfinishedProcedures.get(0).getProcId(); + assertEquals(procId, newProcId); + + ProcedureTestingUtility.waitProcedure(master.getMasterProcedureExecutor(), newProcId); + assertFalse(master.getSnapshotManager().isTableTakingAnySnapshot(TABLE_NAME)); + + List snapshots + = master.getSnapshotManager().getCompletedSnapshots(); + assertEquals(1, snapshots.size()); + assertEquals(SNAPSHOT_NAME, snapshots.get(0).getName()); + assertEquals(TABLE_NAME, TableName.valueOf(snapshots.get(0).getTable())); + SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); + } + + @Test + public void testRegionServerCrashWhileTakingSnapshot() throws Exception { + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + MasterProcedureEnv env = procExec.getEnvironment(); + SnapshotProcedure sp = new SnapshotProcedure(env, snapshotProto); + long procId = procExec.submitProcedure(sp); + + SnapshotRegionProcedure snp = waitProcedureRunnableAndGetFirst( + SnapshotRegionProcedure.class, 60000); + ServerName targetServer = env.getAssignmentManager().getRegionStates() + .getRegionStateNode(snp.getRegion()).getRegionLocation(); + TEST_UTIL.getHBaseCluster().killRegionServer(targetServer); + + TEST_UTIL.waitFor(60000, () -> snp.inRetrying()); + ProcedureTestingUtility.waitProcedure(procExec, procId); + + SnapshotTestingUtils.assertOneSnapshotThatMatches(TEST_UTIL.getAdmin(), snapshotProto); + SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); + } + + @Test + public void testRegionServerCrashWhileVerifyingSnapshot() throws Exception { + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + MasterProcedureEnv env = procExec.getEnvironment(); + SnapshotProcedure sp = new SnapshotProcedure(env, snapshotProto); + long procId = procExec.submitProcedure(sp); + + SnapshotVerifyProcedure svp = waitProcedureRunnableAndGetFirst( + SnapshotVerifyProcedure.class, 60000); + TEST_UTIL.waitFor(10000, () -> svp.getServerName() != null); + ServerName previousTargetServer = svp.getServerName(); + + HRegionServer rs = TEST_UTIL.getHBaseCluster().getRegionServer(previousTargetServer); + TEST_UTIL.getHBaseCluster().killRegionServer(rs.getServerName()); + TEST_UTIL.waitFor(60000, () -> svp.getServerName() != null + && !svp.getServerName().equals(previousTargetServer)); + ProcedureTestingUtility.waitProcedure(procExec, procId); + + SnapshotTestingUtils.assertOneSnapshotThatMatches(TEST_UTIL.getAdmin(), snapshotProto); + SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); + } + + public > T waitProcedureRunnableAndGetFirst( + Class clazz, long timeout) throws IOException { + TEST_UTIL.waitFor(timeout, () -> master.getProcedures().stream() + .anyMatch(clazz::isInstance)); + Optional procOpt = master.getMasterProcedureExecutor().getProcedures().stream() + .filter(clazz::isInstance).map(clazz::cast).findFirst(); + assertTrue(procOpt.isPresent()); + return procOpt.get(); + } + + @Test(expected = org.apache.hadoop.hbase.snapshot.SnapshotCreationException.class) + public void testClientTakingTwoSnapshotOnSameTable() throws Exception { + Thread first = new Thread("first-client") { + @Override + public void run() { + try { + TEST_UTIL.getAdmin().snapshot(snapshot); + } catch (IOException e) { + LOG.error("first client failed taking snapshot", e); + fail("first client failed taking snapshot"); + } + } + }; + first.start(); + Thread.sleep(1000); + // we don't allow different snapshot with same name + SnapshotDescription snapshotWithSameName = + new SnapshotDescription(SNAPSHOT_NAME, TABLE_NAME, SnapshotType.SKIPFLUSH); + TEST_UTIL.getAdmin().snapshot(snapshotWithSameName); + } + + @Test(expected = org.apache.hadoop.hbase.snapshot.SnapshotCreationException.class) + public void testClientTakeSameSnapshotTwice() throws IOException, InterruptedException { + Thread first = new Thread("first-client") { + @Override + public void run() { + try { + TEST_UTIL.getAdmin().snapshot(snapshot); + } catch (IOException e) { + LOG.error("first client failed taking snapshot", e); + fail("first client failed taking snapshot"); + } + } + }; + first.start(); + Thread.sleep(1000); + TEST_UTIL.getAdmin().snapshot(snapshot); + } + + @Test + public void testTakeZkCoordinatedSnapshotAndProcedureCoordinatedSnapshotBoth() throws Exception { + String newSnapshotName = SNAPSHOT_NAME + "_2"; + Thread first = new Thread("procedure-snapshot") { + @Override + public void run() { + try { + TEST_UTIL.getAdmin().snapshot(snapshot); + } catch (IOException e) { + LOG.error("procedure snapshot failed", e); + fail("procedure snapshot failed"); + } + } + }; + first.start(); + Thread.sleep(1000); + + SnapshotManager sm = master.getSnapshotManager(); + TEST_UTIL.waitFor(2000, 50, () -> !sm.isTakingSnapshot(TABLE_NAME) + && sm.isTableTakingAnySnapshot(TABLE_NAME)); + + TEST_UTIL.getConfiguration().setBoolean("hbase.snapshot.zk.coordinated", true); + SnapshotDescription snapshotOnSameTable = + new SnapshotDescription(newSnapshotName, TABLE_NAME, SnapshotType.SKIPFLUSH); + SnapshotProtos.SnapshotDescription snapshotOnSameTableProto = ProtobufUtil + .createHBaseProtosSnapshotDesc(snapshotOnSameTable); + Thread second = new Thread("zk-snapshot") { + @Override + public void run() { + try { + master.getSnapshotManager().takeSnapshot(snapshotOnSameTableProto); + } catch (IOException e) { + LOG.error("zk snapshot failed", e); + fail("zk snapshot failed"); + } + } + }; + second.start(); + + TEST_UTIL.waitFor(2000, () -> sm.isTakingSnapshot(TABLE_NAME)); + TEST_UTIL.waitFor(60000, () -> sm.isSnapshotDone(snapshotOnSameTableProto) + && !sm.isTakingAnySnapshot()); + SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); + SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotOnSameTableProto, TABLE_NAME, CF); + } + + @Test + public void testRunningTwoSnapshotProcedureOnSameTable() throws Exception { + String newSnapshotName = SNAPSHOT_NAME + "_2"; + SnapshotProtos.SnapshotDescription snapshotProto2 = SnapshotProtos.SnapshotDescription + .newBuilder(snapshotProto).setName(newSnapshotName).build(); + + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + MasterProcedureEnv env = procExec.getEnvironment(); + + SnapshotProcedure sp1 = new SnapshotProcedure(env, snapshotProto); + SnapshotProcedure sp2 = new SnapshotProcedure(env, snapshotProto2); + SnapshotProcedure spySp1 = getDelayedOnSpecificStateSnapshotProcedure(sp1, + procExec.getEnvironment(), SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS); + SnapshotProcedure spySp2 = getDelayedOnSpecificStateSnapshotProcedure(sp2, + procExec.getEnvironment(), SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS); + + long procId1 = procExec.submitProcedure(spySp1); + long procId2 = procExec.submitProcedure(spySp2); + TEST_UTIL.waitFor(2000, () -> env.getMasterServices().getProcedures() + .stream().map(Procedure::getProcId).collect(Collectors.toList()) + .containsAll(Arrays.asList(procId1, procId2))); + + assertFalse(procExec.isFinished(procId1)); + assertFalse(procExec.isFinished(procId2)); + + ProcedureTestingUtility.waitProcedure(master.getMasterProcedureExecutor(), procId1); + ProcedureTestingUtility.waitProcedure(master.getMasterProcedureExecutor(), procId2); + + List snapshots = + master.getSnapshotManager().getCompletedSnapshots(); + assertEquals(2, snapshots.size()); + snapshots.sort(Comparator.comparing(SnapshotProtos.SnapshotDescription::getName)); + assertEquals(SNAPSHOT_NAME, snapshots.get(0).getName()); + assertEquals(newSnapshotName, snapshots.get(1).getName()); + SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); + SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto2, TABLE_NAME, CF); + } + + @Test + public void testTableInMergeWhileTakingSnapshot() throws Exception { + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + List regions = master.getAssignmentManager().getTableRegions(TABLE_NAME, true) + .stream().sorted(RegionInfo.COMPARATOR).collect(Collectors.toList()); + MergeTableRegionsProcedure mergeProc = new MergeTableRegionsProcedure( + procExec.getEnvironment(), new RegionInfo[] {regions.get(0), regions.get(1)}, false); + long mergeProcId = procExec.submitProcedure(mergeProc); + // wait until merge region procedure running + TEST_UTIL.waitFor(10000, () -> + procExec.getProcedure(mergeProcId).getState() == ProcedureState.RUNNABLE); + SnapshotProcedure sp = new SnapshotProcedure(procExec.getEnvironment(), snapshotProto); + long snapshotProcId = procExec.submitProcedure(sp); + TEST_UTIL.waitFor(2000, 1000, () -> procExec.getProcedure(snapshotProcId) != null && + procExec.getProcedure(snapshotProcId).getState() == ProcedureState.WAITING_TIMEOUT); + ProcedureTestingUtility.waitProcedure(procExec, snapshotProcId); + SnapshotTestingUtils.confirmSnapshotValid(TEST_UTIL, snapshotProto, TABLE_NAME, CF); + } + + @Test + public void testSnapshotCorruptedAndRollback() throws Exception { + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + SnapshotProcedure sp = new SnapshotProcedure(procExec.getEnvironment(), snapshotProto); + procExec.submitProcedure(sp); + TEST_UTIL.waitFor(60000, 500, () -> sp.getCurrentStateId() > + SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT_VALUE); + DistributedFileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); + Optional region = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream() + .filter(r -> !r.getStoreFileList(new byte[][] { CF }).isEmpty()) + .findFirst(); + assertTrue(region.isPresent()); + region.get().getStoreFileList(new byte[][] { CF }).forEach(s -> { + try { + // delete real data files to trigger the CorruptedSnapshotException + dfs.delete(new Path(s), true); + LOG.info("delete {} to make snapshot corrupt", s); + } catch (Exception e) { + LOG.warn("Failed delete {} to make snapshot corrupt", s, e); + } + } + ); + TEST_UTIL.waitFor(60000, () -> sp.isFailed() && sp.isFinished()); + Configuration conf = master.getConfiguration(); + Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir( + snapshotProto, CommonFSUtils.getRootDir(conf), conf); + assertFalse(dfs.exists(workingDir)); + assertFalse(master.getSnapshotManager().isTakingSnapshot(TABLE_NAME)); + assertFalse(master.getSnapshotManager().isTakingAnySnapshot()); + } + + + private SnapshotProcedure getDelayedOnSpecificStateSnapshotProcedure( + SnapshotProcedure sp, MasterProcedureEnv env, SnapshotState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + SnapshotProcedure spySp = Mockito.spy(sp); + Mockito.doAnswer(new AnswersWithDelay(60000, new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + return invocation.callRealMethod(); + } + })).when(spySp).executeFromState(env, state); + return spySp; + } + + @After + public void teardown() throws Exception { + if (this.master != null) { + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate( + master.getMasterProcedureExecutor(), false); + } + TEST_UTIL.shutdownMiniCluster(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotRegionProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotRegionProcedure.java new file mode 100644 index 000000000000..a6f6ed756513 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotRegionProcedure.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.SnapshotDescription; +import org.apache.hadoop.hbase.client.SnapshotType; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifestV2; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.RegionSplitter; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestSnapshotRegionProcedure { + private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotRegionProcedure.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSnapshotRegionProcedure.class); + + private static HBaseTestingUtil TEST_UTIL; + private HMaster master; + private TableName tableName; + private SnapshotProtos.SnapshotDescription snapshotProto; + private Path workingDir; + private FileSystem workingDirFs; + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtil(); + Configuration conf = TEST_UTIL.getConfiguration(); + // disable info server. Info server is useful when we run unit tests locally, but it will + // fails integration testing of jenkins. + // conf.setInt(HConstants.MASTER_INFO_PORT, 8080); + + // delay dispatch so that we can do something, for example kill a target server + conf.setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 10000); + conf.setInt(RemoteProcedureDispatcher.DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, 128); + TEST_UTIL.startMiniCluster(3); + master = TEST_UTIL.getHBaseCluster().getMaster(); + tableName = TableName.valueOf(Bytes.toBytes("SRPTestTable")); + byte[] cf = Bytes.toBytes("cf"); + String SNAPSHOT_NAME = "SnapshotRegionProcedureTest"; + SnapshotDescription snapshot = + new SnapshotDescription(SNAPSHOT_NAME, tableName, SnapshotType.FLUSH); + snapshotProto = ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot); + snapshotProto = SnapshotDescriptionUtils.validate(snapshotProto, master.getConfiguration()); + final byte[][] splitKeys = new RegionSplitter.HexStringSplit().split(10); + Table table = TEST_UTIL.createTable(tableName, cf, splitKeys); + TEST_UTIL.loadTable(table, cf, false); + Path rootDir = CommonFSUtils.getRootDir(conf); + this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshotProto, rootDir, conf); + this.workingDirFs = workingDir.getFileSystem(conf); + if (!workingDirFs.exists(workingDir)) { + workingDirFs.mkdirs(workingDir); + } + } + + private boolean assertRegionManifestGenerated(RegionInfo region) throws Exception { + // path: /////region-manifest. + String regionManifest = SnapshotManifestV2.SNAPSHOT_MANIFEST_PREFIX + region.getEncodedName(); + Path targetPath = new Path(workingDir, regionManifest); + return workingDirFs.exists(targetPath); + } + + @Test + public void testSimpleSnapshotRegion() throws Exception { + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + List> regions = + master.getAssignmentManager().getTableRegionsAndLocations(tableName, true); + assertEquals(10, regions.size()); + Pair region = regions.get(0); + SnapshotRegionProcedure srp = new SnapshotRegionProcedure(snapshotProto, region.getFirst()); + long procId = procExec.submitProcedure(srp); + ProcedureTestingUtility.waitProcedure(procExec, procId); + assertTrue(assertRegionManifestGenerated(region.getFirst())); + } + + @Test + public void testRegionServerCrashWhileTakingSnapshotRegion() throws Exception { + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + List> regions = + master.getAssignmentManager().getTableRegionsAndLocations(tableName, true); + assertEquals(10, regions.size()); + Pair pair = regions.get(0); + SnapshotRegionProcedure srp = new SnapshotRegionProcedure(snapshotProto, pair.getFirst()); + long procId = procExec.submitProcedure(srp); + TEST_UTIL.getHBaseCluster().killRegionServer(pair.getSecond()); + TEST_UTIL.waitFor(60000, () -> !pair.getSecond().equals(master.getAssignmentManager() + .getRegionStates().getRegionStateNode(pair.getFirst()).getRegionLocation())); + TEST_UTIL.waitFor(60000, () -> srp.inRetrying()); + ProcedureTestingUtility.waitProcedure(procExec, procId); + assertTrue(assertRegionManifestGenerated(pair.getFirst())); + } + + @After + public void teardown() throws Exception { + if (this.master != null) { + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate( + master.getMasterProcedureExecutor(), false); + } + TEST_UTIL.shutdownMiniCluster(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotVerifyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotVerifyProcedure.java new file mode 100644 index 000000000000..0e2bdcc88f2b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSnapshotVerifyProcedure.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.SnapshotDescription; +import org.apache.hadoop.hbase.client.SnapshotType; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStoreFile; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.RegionSplitter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestSnapshotVerifyProcedure { + private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotVerifyProcedure.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSnapshotVerifyProcedure.class); + + private HBaseTestingUtil TEST_UTIL; + private final TableName tableName = TableName.valueOf("TestRSSnapshotVerifier"); + private final byte[] cf = Bytes.toBytes("cf"); + private final SnapshotDescription snapshot = + new SnapshotDescription("test-snapshot", tableName, SnapshotType.FLUSH); + private SnapshotProtos.SnapshotDescription snapshotProto = + ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot); + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtil(); + Configuration conf = TEST_UTIL.getConfiguration(); + // delay procedure dispatch + conf.setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 10000); + conf.setInt(RemoteProcedureDispatcher.DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, 128); + TEST_UTIL.startMiniCluster(3); + final byte[][] splitKeys = new RegionSplitter.HexStringSplit().split(10); + Table table = TEST_UTIL.createTable(tableName, cf, splitKeys); + TEST_UTIL.loadTable(table, cf, false); + TEST_UTIL.getAdmin().flush(tableName); + + // prepare unverified snapshot + snapshotProto = SnapshotDescriptionUtils.validate(snapshotProto, conf); + Path rootDir = CommonFSUtils.getRootDir(conf); + Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshotProto, rootDir, conf); + FileSystem workingDirFs = workingDir.getFileSystem(conf); + if (!workingDirFs.exists(workingDir)) { + workingDirFs.mkdirs(workingDir); + } + ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(snapshot.getName()); + SnapshotManifest manifest = SnapshotManifest + .create(conf, workingDirFs, workingDir, snapshotProto, monitor); + manifest.addTableDescriptor(TEST_UTIL.getHBaseCluster() + .getMaster().getTableDescriptors().get(tableName)); + SnapshotDescriptionUtils.writeSnapshotInfo(snapshotProto, workingDir, workingDirFs); + TEST_UTIL.getHBaseCluster() + .getRegions(tableName).forEach(r -> { + try { + r.addRegionToSnapshot(snapshotProto, monitor); + } catch (IOException e) { + LOG.warn("Failed snapshot region {}", r.getRegionInfo()); + } + }); + manifest.consolidate(); + } + + @Test + public void testSimpleVerify() throws Exception { + Optional regionOpt = TEST_UTIL.getHBaseCluster().getRegions(tableName) + .stream().filter(r -> !r.getStore(cf).getStorefiles().isEmpty()).findFirst(); + Assert.assertTrue(regionOpt.isPresent()); + HRegion region = regionOpt.get(); + SnapshotVerifyProcedure p1 = new SnapshotVerifyProcedure(snapshotProto, region.getRegionInfo()); + ProcedureExecutor procExec = + TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(); + long procId = procExec.submitProcedure(p1); + ProcedureTestingUtility.waitProcedure(procExec, procId); + Assert.assertTrue(p1.isSuccess()); + + // delete store file to trigger a CorruptedSnapshotException + for (HStoreFile file : region.getStore(cf).getStorefiles()) { + TEST_UTIL.getDFSCluster().getFileSystem().delete(file.getPath(), true); + LOG.info("delete store file {}", file.getPath()); + } + SnapshotVerifyProcedure p2 = new SnapshotVerifyProcedure(snapshotProto, region.getRegionInfo()); + long newProcId = procExec.submitProcedure(p2); + ProcedureTestingUtility.waitProcedure(procExec, newProcId); + Assert.assertTrue(p2.isSuccess()); + } + + @Test + public void testRestartMaster() throws Exception { + RegionInfo region = TEST_UTIL.getHBaseCluster().getRegions(tableName).get(0).getRegionInfo(); + SnapshotVerifyProcedure svp = new SnapshotVerifyProcedure(snapshotProto, region); + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + long procId = master.getMasterProcedureExecutor().submitProcedure(svp); + TEST_UTIL.waitFor(10000, () -> svp.getServerName() != null); + ServerName worker = svp.getServerName(); + int availableWorker = master.getSnapshotManager().getAvailableWorker(worker); + + // restart master + TEST_UTIL.getHBaseCluster().killMaster(master.getServerName()); + TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 30000); + TEST_UTIL.getHBaseCluster().startMaster(); + TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); + + // restore used worker + master = TEST_UTIL.getHBaseCluster().getMaster(); + SnapshotVerifyProcedure svp2 = master.getMasterProcedureExecutor() + .getProcedure(SnapshotVerifyProcedure.class, procId); + Assert.assertNotNull(svp2); + Assert.assertFalse(svp2.isFinished()); + Assert.assertNotNull(svp2.getServerName()); + Assert.assertEquals(worker, svp.getServerName()); + Assert.assertEquals((int) master.getSnapshotManager().getAvailableWorker(worker), + availableWorker); + + // release worker + ProcedureTestingUtility.waitProcedure(master.getMasterProcedureExecutor(), svp2); + Assert.assertEquals((int) master.getSnapshotManager().getAvailableWorker(worker), + availableWorker + 1); + } + + @After + public void teardown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotWhileRSCrashes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotWhileRSCrashes.java deleted file mode 100644 index 1c1fcf81e3ed..000000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotWhileRSCrashes.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.snapshot; - -import static org.junit.Assert.assertNull; - -import java.io.IOException; -import java.io.UncheckedIOException; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtil; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.master.locking.LockManager.MasterLock; -import org.apache.hadoop.hbase.master.locking.LockProcedure; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.procedure2.LockType; -import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; -import org.apache.hadoop.hbase.testclassification.MasterTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ MasterTests.class, MediumTests.class }) -public class TestSnapshotWhileRSCrashes { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestSnapshotWhileRSCrashes.class); - - private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); - - private static TableName NAME = TableName.valueOf("Cleanup"); - - private static byte[] CF = Bytes.toBytes("cf"); - - @BeforeClass - public static void setUp() throws Exception { - UTIL.startMiniCluster(3); - UTIL.createMultiRegionTable(NAME, CF); - UTIL.waitTableAvailable(NAME); - } - - @AfterClass - public static void tearDown() throws Exception { - UTIL.shutdownMiniCluster(); - } - - @Test - public void test() throws InterruptedException, IOException { - String snName = "sn"; - MasterLock lock = UTIL.getMiniHBaseCluster().getMaster().getLockManager().createMasterLock(NAME, - LockType.EXCLUSIVE, "for testing"); - lock.acquire(); - Thread t = new Thread(() -> { - try { - UTIL.getAdmin().snapshot(snName, NAME); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); - t.setDaemon(true); - t.start(); - ProcedureExecutor procExec = - UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); - UTIL.waitFor(10000, - () -> procExec.getProcedures().stream().filter(p -> !p.isFinished()) - .filter(p -> p instanceof LockProcedure).map(p -> (LockProcedure) p) - .filter(p -> NAME.equals(p.getTableName())).anyMatch(p -> !p.isLocked())); - UTIL.getMiniHBaseCluster().stopRegionServer(0); - lock.release(); - // the snapshot can not work properly when there are rs crashes, so here we just want to make - // sure that the regions could online - try (Table table = UTIL.getConnection().getTable(NAME); - ResultScanner scanner = table.getScanner(CF)) { - assertNull(scanner.next()); - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSSnapshotVerifier.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSSnapshotVerifier.java new file mode 100644 index 000000000000..39b1418d1502 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSSnapshotVerifier.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.SnapshotDescription; +import org.apache.hadoop.hbase.client.SnapshotType; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.RegionSplitter; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestRSSnapshotVerifier { + private static final Logger LOG = LoggerFactory.getLogger(TestRSSnapshotVerifier.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRSSnapshotVerifier.class); + + private HBaseTestingUtil TEST_UTIL; + private final TableName tableName = TableName.valueOf("TestRSSnapshotVerifier"); + private final byte[] cf = Bytes.toBytes("cf"); + private final SnapshotDescription snapshot = + new SnapshotDescription("test-snapshot", tableName, SnapshotType.FLUSH); + private SnapshotProtos.SnapshotDescription snapshotProto = + ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot); + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtil(); + TEST_UTIL.startMiniCluster(3); + final byte[][] splitKeys = new RegionSplitter.HexStringSplit().split(10); + Table table = TEST_UTIL.createTable(tableName, cf, splitKeys); + TEST_UTIL.loadTable(table, cf, false); + TEST_UTIL.getAdmin().flush(tableName); + + // prepare unverified snapshot + Configuration conf = TEST_UTIL.getConfiguration(); + snapshotProto = SnapshotDescriptionUtils.validate(snapshotProto, conf); + Path rootDir = CommonFSUtils.getRootDir(conf); + Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshotProto, rootDir, conf); + FileSystem workingDirFs = workingDir.getFileSystem(conf); + if (!workingDirFs.exists(workingDir)) { + workingDirFs.mkdirs(workingDir); + } + ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(snapshot.getName()); + SnapshotManifest manifest = SnapshotManifest + .create(conf, workingDirFs, workingDir, snapshotProto, monitor); + manifest.addTableDescriptor(TEST_UTIL.getHBaseCluster() + .getMaster().getTableDescriptors().get(tableName)); + SnapshotDescriptionUtils.writeSnapshotInfo(snapshotProto, workingDir, workingDirFs); + TEST_UTIL.getHBaseCluster() + .getRegions(tableName).forEach(r -> { + try { + r.addRegionToSnapshot(snapshotProto, monitor); + } catch (IOException e) { + LOG.warn("Failed snapshot region {}", r.getRegionInfo()); + } + }); + manifest.consolidate(); + } + + @Test(expected = org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException.class) + public void testVerifyStoreFile() throws Exception { + RSSnapshotVerifier verifier = TEST_UTIL + .getHBaseCluster().getRegionServer(0).getRsSnapshotVerifier(); + HRegion region = TEST_UTIL.getHBaseCluster().getRegions(tableName).stream() + .filter(r -> !r.getStore(cf).getStorefiles().isEmpty()).findFirst().get(); + Path filePath = new ArrayList<>(region.getStore(cf).getStorefiles()).get(0).getPath(); + TEST_UTIL.getDFSCluster().getFileSystem().delete(filePath, true); + LOG.info("delete store file {}", filePath); + verifier.verifyRegion(snapshotProto, region.getRegionInfo()); + } + + @After + public void teardown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } +}