diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java index 3b99781a5585..c0287a99435c 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -78,9 +78,13 @@ public void add(InlineChore chore) { } public void add(Procedure procedure) { - LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), - procedure.getTimeoutTimestamp()); - queue.add(new DelayedProcedure<>(procedure)); + if (procedure.getTimeout() > 0) { + LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), + procedure.getTimeoutTimestamp()); + queue.add(new DelayedProcedure<>(procedure)); + } else { + LOG.info("Got negative timeout {} for {}, skip adding", procedure.getTimeout(), procedure); + } } public boolean remove(Procedure procedure) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java index 536f232338e9..93ff27db3f72 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java @@ -25,19 +25,25 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.function.LongConsumer; import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure private List disabledPeerIds; - private List> futures; + private CompletableFuture future; private ExecutorService executor; + private RetryCounter retryCounter; + @Override public String getGlobalId() { return getClass().getSimpleName(); } + private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer) + throws ProcedureSuspendedException { + if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(conf); + } + long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); + backoffConsumer.accept(backoff); + throw suspend(Math.toIntExact(backoff), true); + } + + private void resetRetry() { + retryCounter = null; + } + private ExecutorService getExecutorService() { if (executor == null) { - executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder() + executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder() .setNameFormat(getClass().getSimpleName() + "-%d").setDaemon(true).build()); } return executor; @@ -95,14 +117,17 @@ private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSu peerProcCount = env.getMasterServices().getProcedures().stream() .filter(p -> p instanceof PeerProcedureInterface).filter(p -> !p.isFinished()).count(); } catch (IOException e) { - LOG.warn("failed to check peer procedure status", e); - throw suspend(5000, true); + throw suspend(env.getMasterConfiguration(), + backoff -> LOG.warn("failed to check peer procedure status, sleep {} secs and retry later", + backoff / 1000, e)); } if (peerProcCount > 0) { - LOG.info("There are still {} pending peer procedures, will sleep and check later", - peerProcCount); - throw suspend(10_000, true); + throw suspend(env.getMasterConfiguration(), + backoff -> LOG.info( + "There are still {} pending peer procedures, sleep {} secs and retry later", + peerProcCount, backoff / 1000)); } + resetRetry(); LOG.info("No pending peer procedures found, continue..."); } @@ -122,8 +147,10 @@ protected Flow executeFromState(MasterProcedureEnv env, try { oldStorage.deleteAllData(); } catch (KeeperException e) { - LOG.warn("failed to delete old replication queue data, sleep and retry later", e); - suspend(10_000, true); + throw suspend(env.getMasterConfiguration(), + backoff -> LOG.warn( + "failed to delete old replication queue data, sleep {} secs and retry later", + backoff / 1000, e)); } return Flow.NO_MORE_STATE; } @@ -132,6 +159,7 @@ protected Flow executeFromState(MasterProcedureEnv env, disabledPeerIds = peers.stream().filter(ReplicationPeerDescription::isEnabled) .map(ReplicationPeerDescription::getPeerId).collect(Collectors.toList()); setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER); + resetRetry(); return Flow.HAS_MORE_STATE; case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER: for (String peerId : disabledPeerIds) { @@ -140,39 +168,52 @@ protected Flow executeFromState(MasterProcedureEnv env, setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE); return Flow.HAS_MORE_STATE; case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE: - if (futures != null) { - // wait until all futures done - long notDone = futures.stream().filter(f -> !f.isDone()).count(); - if (notDone == 0) { - boolean succ = true; - for (Future future : futures) { - try { - future.get(); - } catch (Exception e) { - succ = false; - LOG.warn("Failed to migrate", e); - } - } - if (succ) { - shutdownExecutorService(); - setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING); - return Flow.HAS_MORE_STATE; - } - // reschedule to retry migration again - futures = null; - } else { - LOG.info("There still {} pending migration tasks, will sleep and check later", notDone); - throw suspend(10_000, true); + if (future != null) { + // should have finished when we arrive here + assert future.isDone(); + try { + future.get(); + } catch (Exception e) { + future = null; + throw suspend(env.getMasterConfiguration(), + backoff -> LOG.warn("failed to migrate queue data, sleep {} secs and retry later", + backoff / 1000, e)); } + shutdownExecutorService(); + setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING); + resetRetry(); + return Flow.HAS_MORE_STATE; } - try { - futures = env.getReplicationPeerManager() - .migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService()); - } catch (IOException e) { - LOG.warn("failed to submit migration tasks", e); - throw suspend(10_000, true); - } - throw suspend(10_000, true); + future = env.getReplicationPeerManager() + .migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService()); + FutureUtils.addListener(future, (r, e) -> { + // should acquire procedure execution lock to make sure that the procedure executor has + // finished putting this procedure to the WAITING_TIMEOUT state, otherwise there could be + // race and cause unexpected result + IdLock procLock = + env.getMasterServices().getMasterProcedureExecutor().getProcExecutionLock(); + IdLock.Entry lockEntry; + try { + lockEntry = procLock.getLockEntry(getProcId()); + } catch (IOException ioe) { + LOG.error("Error while acquiring execution lock for procedure {}" + + " when trying to wake it up, aborting...", ioe); + env.getMasterServices().abort("Can not acquire procedure execution lock", e); + return; + } + try { + setTimeoutFailure(env); + } finally { + procLock.releaseLockEntry(lockEntry); + } + }); + // here we set timeout to -1 so the ProcedureExecutor will not schedule a Timer for us + setTimeout(-1); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + // skip persistence is a must now since when restarting, if the procedure is in + // WAITING_TIMEOUT state and has -1 as timeout, it will block there forever... + skipPersistence(); + throw new ProcedureSuspendedException(); case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING: long rsWithLowerVersion = env.getMasterServices().getServerManager().getOnlineServers().values().stream() @@ -181,9 +222,11 @@ protected Flow executeFromState(MasterProcedureEnv env, setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER); return Flow.HAS_MORE_STATE; } else { - LOG.info("There are still {} region servers which have a major version less than {}, " - + "will sleep and check later", rsWithLowerVersion, MIN_MAJOR_VERSION); - throw suspend(10_000, true); + throw suspend(env.getMasterConfiguration(), + backoff -> LOG.warn( + "There are still {} region servers which have a major version" + + " less than {}, sleep {} secs and check later", + rsWithLowerVersion, MIN_MAJOR_VERSION, backoff / 1000)); } case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER: for (String peerId : disabledPeerIds) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index f3cdaddb31ca..8cfb36a1bc17 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.net.URI; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; @@ -29,10 +28,10 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -72,6 +71,7 @@ import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; @@ -797,25 +797,38 @@ private void migrateHFileRefs(ZKReplicationQueueStorageForMigration oldQueueStor } } + private interface ExceptionalRunnable { + void run() throws Exception; + } + + private CompletableFuture runAsync(ExceptionalRunnable task, ExecutorService executor) { + CompletableFuture future = new CompletableFuture<>(); + executor.execute(() -> { + try { + task.run(); + future.complete(null); + } catch (Exception e) { + future.completeExceptionally(e); + } + }); + return future; + } + /** - * Submit the migration tasks to the given {@code executor} and return the futures. + * Submit the migration tasks to the given {@code executor}. */ - List> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) - throws IOException { + CompletableFuture migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) { // the replication queue table creation is asynchronous and will be triggered by addPeer, so // here we need to manually initialize it since we will not call addPeer. - initializeQueueStorage(); + try { + initializeQueueStorage(); + } catch (IOException e) { + return FutureUtils.failedFuture(e); + } ZKReplicationQueueStorageForMigration oldStorage = new ZKReplicationQueueStorageForMigration(zookeeper, conf); - return Arrays.asList(executor.submit(() -> { - migrateQueues(oldStorage); - return null; - }), executor.submit(() -> { - migrateLastPushedSeqIds(oldStorage); - return null; - }), executor.submit(() -> { - migrateHFileRefs(oldStorage); - return null; - })); + return CompletableFuture.allOf(runAsync(() -> migrateQueues(oldStorage), executor), + runAsync(() -> migrateLastPushedSeqIds(oldStorage), executor), + runAsync(() -> migrateHFileRefs(oldStorage), executor)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java index 76301ae67531..d211d707e92d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java @@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -146,9 +145,7 @@ private Map> prepareData() throws Exception { @Test public void testNoPeers() throws Exception { prepareData(); - for (Future future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) { - future.get(1, TimeUnit.MINUTES); - } + manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES); // should have called initializer verify(queueStorageInitializer).initialize(); // should have not migrated any data since there is no peer @@ -165,9 +162,7 @@ public void testMigrate() throws Exception { // value is not used in this test, so just add a mock peers.put("peer_" + i, mock(ReplicationPeerDescription.class)); } - for (Future future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) { - future.get(1, TimeUnit.MINUTES); - } + manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES); // should have called initializer verify(queueStorageInitializer).initialize(); List queueDatas = queueStorage.listAllQueues();