diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java index 6b4461402f20..bb73d99b6c33 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java @@ -140,20 +140,34 @@ protected void push(final Procedure procedure, final boolean addFront, final boo * NOTE: this method is called with the sched lock held. * @return the Procedure to execute, or null if nothing is available. */ - protected abstract Procedure dequeue(); + protected abstract Procedure dequeue(boolean highPriority); + + @Override + public Procedure pollHighPriority() { + return poll(-1, true); + } + + @Override + public Procedure pollHighPriority(long timeout, TimeUnit unit) { + return poll(unit.toNanos(timeout), true); + } @Override public Procedure poll() { - return poll(-1); + return poll(-1, false); } @Override public Procedure poll(long timeout, TimeUnit unit) { - return poll(unit.toNanos(timeout)); + return poll(unit.toNanos(timeout), false); } - @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") public Procedure poll(final long nanos) { + return poll(nanos, false); + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") + private Procedure poll(final long nanos, final boolean highPriority) { schedLock(); try { if (!running) { @@ -174,7 +188,7 @@ public Procedure poll(final long nanos) { return null; } } - final Procedure pollResult = dequeue(); + final Procedure pollResult = dequeue(highPriority); pollCalls++; nullPollCalls += (pollResult == null) ? 1 : 0; diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index f8857859131a..420f6e79016d 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -1967,13 +1967,14 @@ protected WorkerThread(ThreadGroup group, String prefix) { public void sendStopSignal() { scheduler.signalAll(); } + @Override public void run() { long lastUpdate = EnvironmentEdgeManager.currentTime(); try { while (isRunning() && keepAlive(lastUpdate)) { @SuppressWarnings("unchecked") - Procedure proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); + Procedure proc = getProcedure(); if (proc == null) { continue; } @@ -2025,6 +2026,10 @@ public long getCurrentRunTime() { protected boolean keepAlive(long lastUpdate) { return true; } + + protected Procedure getProcedure() { + return scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); + } } // A worker thread which can be added when core workers are stuck. Will timeout after @@ -2040,6 +2045,25 @@ protected boolean keepAlive(long lastUpdate) { } } + private final class HighPriorityWorkerThread extends WorkerThread { + private Procedure procedure; + + public HighPriorityWorkerThread(ThreadGroup group, Procedure proc) { + super(group, "HighPriorityPEWorker-"); + this.procedure = proc; + } + + @Override + protected boolean keepAlive(long lastUpdate) { + return false; + } + + @Override + protected Procedure getProcedure() { + return procedure; + } + } + // ---------------------------------------------------------------------------- // TODO-MAYBE: Should we provide a InlineChore to notify the store with the // full set of procedures pending and completed to write a compacted @@ -2051,7 +2075,7 @@ protected boolean keepAlive(long lastUpdate) { private final class WorkerMonitor extends InlineChore { public static final String WORKER_MONITOR_INTERVAL_CONF_KEY = "hbase.procedure.worker.monitor.interval.msec"; - private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec + private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 1000; // 1sec public static final String WORKER_STUCK_THRESHOLD_CONF_KEY = "hbase.procedure.worker.stuck.threshold.msec"; @@ -2071,6 +2095,9 @@ public WorkerMonitor() { @Override public void run() { + // accelerate high priority procedure. + accelerateHighPriority(); + final int stuckCount = checkForStuckWorkers(); checkThreadCount(stuckCount); @@ -2078,6 +2105,26 @@ public void run() { refreshConfig(); } + private void accelerateHighPriority() { + if (!scheduler.hasRunnables()) { + return; + } + while (true) { + // Poll a high priority procedure and execute it intermediately + Procedure highPriorityProcedure = scheduler.pollHighPriority(1, TimeUnit.NANOSECONDS); + if (highPriorityProcedure != null) { + final HighPriorityWorkerThread worker = + new HighPriorityWorkerThread(threadGroup, highPriorityProcedure); + workerThreads.add(worker); + worker.start(); + LOG.info("Added new HighPriority worker thread {} for highPriorityProcedure {}", worker, + highPriorityProcedure); + } else { + return; + } + } + } + private int checkForStuckWorkers() { // check if any of the worker is stuck int stuckCount = 0; diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java index d787cc0979c1..18ef559a1d54 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java @@ -95,6 +95,20 @@ public interface ProcedureScheduler { */ boolean hasRunnables(); + /** + * Fetch one high priority Procedure from the queue + * @return the Procedure to execute, or null if nothing present. + */ + Procedure pollHighPriority(); + + /** + * Fetch one high priority Procedure from the queue + * @param timeout how long to wait before giving up, in units of unit + * @param unit a TimeUnit determining how to interpret the timeout parameter + * @return the Procedure to execute, or null if nothing present. + */ + Procedure pollHighPriority(long timeout, TimeUnit unit); + /** * Fetch one Procedure from the queue * @return the Procedure to execute, or null if nothing present. diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java index feab8be16c0e..f8a537f27d3f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java @@ -43,7 +43,7 @@ protected void enqueue(final Procedure procedure, final boolean addFront) { } @Override - protected Procedure dequeue() { + protected Procedure dequeue(boolean highPriority) { return runnables.poll(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index c470c428125a..b3f534b2b539 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -105,12 +105,16 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { (n, k) -> n.compareKey((String) k); private final static AvlKeyComparator META_QUEUE_KEY_COMPARATOR = (n, k) -> n.compareKey((TableName) k); + private static final AvlKeyComparator SERVER_HIGHPRIORITY_QUEUE_KEY_COMPARATOR = + (n, k) -> n.compareKey((ServerName) k); + private final FairQueue serverHighPriorityRunQueue = new FairQueue<>(); private final FairQueue serverRunQueue = new FairQueue<>(); private final FairQueue tableRunQueue = new FairQueue<>(); private final FairQueue peerRunQueue = new FairQueue<>(); private final FairQueue metaRunQueue = new FairQueue<>(); + private final ServerQueue[] serverHighPriorityBuckets = new ServerQueue[4]; private final ServerQueue[] serverBuckets = new ServerQueue[128]; private TableQueue tableMap = null; private PeerQueue peerMap = null; @@ -135,7 +139,11 @@ protected void enqueue(final Procedure proc, final boolean addFront) { doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); } else if (isServerProcedure(proc)) { ServerProcedureInterface spi = (ServerProcedureInterface) proc; - doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront); + if (spi.hasMetaTableRegion()) { + doAdd(serverHighPriorityRunQueue, getServerQueue(proc), proc, addFront); + } else { + doAdd(serverRunQueue, getServerQueue(proc), proc, addFront); + } } else if (isPeerProcedure(proc)) { doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront); } else { @@ -173,25 +181,30 @@ private > void doAdd(FairQueue fairq, Queue queue, @Override protected boolean queueHasRunnables() { - return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() || - serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables(); + return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() || serverRunQueue + .hasRunnables() || peerRunQueue.hasRunnables() || serverHighPriorityRunQueue.hasRunnables(); } @Override - protected Procedure dequeue() { + protected Procedure dequeue(boolean highPriority) { // meta procedure is always the first priority Procedure pollResult = doPoll(metaRunQueue); // For now, let server handling have precedence over table handling; presumption is that it // is more important handling crashed servers than it is running the // enabling/disabling tables, etc. if (pollResult == null) { - pollResult = doPoll(serverRunQueue); - } - if (pollResult == null) { - pollResult = doPoll(peerRunQueue); + pollResult = doPoll(serverHighPriorityRunQueue); } - if (pollResult == null) { - pollResult = doPoll(tableRunQueue); + if (!highPriority) { + if (pollResult == null) { + pollResult = doPoll(serverRunQueue); + } + if (pollResult == null) { + pollResult = doPoll(peerRunQueue); + } + if (pollResult == null) { + pollResult = doPoll(tableRunQueue); + } } return pollResult; } @@ -269,6 +282,11 @@ private void clearQueue() { clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR); serverBuckets[i] = null; } + for (int i = 0; i < serverHighPriorityBuckets.length; ++i) { + clear(serverHighPriorityBuckets[i], serverHighPriorityRunQueue, + SERVER_HIGHPRIORITY_QUEUE_KEY_COMPARATOR); + serverHighPriorityBuckets[i] = null; + } // Remove Tables clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); @@ -307,6 +325,9 @@ protected int queueSize() { for (ServerQueue serverMap : serverBuckets) { count += queueSize(serverMap); } + for (ServerQueue serverMap : serverHighPriorityBuckets) { + count += queueSize(serverMap); + } count += queueSize(tableMap); count += queueSize(peerMap); count += queueSize(metaMap); @@ -338,7 +359,7 @@ public void completionCleanup(final Procedure proc) { } else if (proc instanceof PeerProcedureInterface) { tryCleanupPeerQueue(getPeerId(proc), proc); } else if (proc instanceof ServerProcedureInterface) { - tryCleanupServerQueue(getServerName(proc), proc); + tryCleanupServerQueue(proc); } else { // No cleanup for other procedure types, yet. return; @@ -391,12 +412,28 @@ private static TableName getTableName(Procedure proc) { return ((TableProcedureInterface)proc).getTableName(); } + private ServerQueue getServerQueue(Procedure proc) { + if (isServerProcedure(proc)) { + ServerProcedureInterface spi = (ServerProcedureInterface) proc; + if (spi.hasMetaTableRegion()) { + return getServerQueue(serverHighPriorityBuckets, SERVER_HIGHPRIORITY_QUEUE_KEY_COMPARATOR, + spi.getServerName(), spi); + } else { + return getServerQueue(serverBuckets, SERVER_QUEUE_KEY_COMPARATOR, spi.getServerName(), spi); + } + } else { + return null; + } + } + // ============================================================================ // Server Queue Lookup Helpers // ============================================================================ - private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) { + private ServerQueue getServerQueue(ServerQueue[] serverBuckets, + AvlKeyComparator keyComparator, ServerName serverName, + ServerProcedureInterface proc) { final int index = getBucketIndex(serverBuckets, serverName.hashCode()); - ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); + ServerQueue node = AvlTree.get(serverBuckets[index], serverName, keyComparator); if (node != null) { return node; } @@ -411,18 +448,32 @@ private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterfa return node; } - private void removeServerQueue(ServerName serverName) { + private void removeServerQueue(ServerQueue[] serverBuckets, + AvlKeyComparator keyComparator, ServerName serverName) { int index = getBucketIndex(serverBuckets, serverName.hashCode()); - serverBuckets[index] = - AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); + serverBuckets[index] = AvlTree.remove(serverBuckets[index], serverName, keyComparator); locking.removeServerLock(serverName); } - private void tryCleanupServerQueue(ServerName serverName, Procedure proc) { + private void tryCleanupServerQueue(Procedure proc) { + ServerName serverName = getServerName(proc); + ServerProcedureInterface spi = (ServerProcedureInterface) proc; + if (spi.hasMetaTableRegion()) { + tryCleanupServerQueue(this.serverHighPriorityBuckets, this.serverHighPriorityRunQueue, + SERVER_HIGHPRIORITY_QUEUE_KEY_COMPARATOR, serverName, proc); + } else { + tryCleanupServerQueue(this.serverBuckets, this.serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR, + serverName, proc); + } + } + + private void tryCleanupServerQueue(ServerQueue[] serverBuckets, + FairQueue serverRunQueue, AvlKeyComparator keyComparator, + ServerName serverName, Procedure proc) { schedLock(); try { int index = getBucketIndex(serverBuckets, serverName.hashCode()); - ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); + ServerQueue node = AvlTree.get(serverBuckets[index], serverName, keyComparator); if (node == null) { return; } @@ -431,7 +482,7 @@ private void tryCleanupServerQueue(ServerName serverName, Procedure proc) { if (node.isEmpty() && lock.tryExclusiveLock(proc)) { removeFromRunQueue(serverRunQueue, node, () -> "clean up server queue after " + proc + " completed"); - removeServerQueue(serverName); + removeServerQueue(serverBuckets, keyComparator, serverName); } } finally { schedUnlock(); @@ -873,13 +924,14 @@ public boolean waitServerExclusiveLock(final Procedure procedure, try { final LockAndQueue lock = locking.getServerLock(serverName); if (lock.tryExclusiveLock(procedure)) { - // In tests we may pass procedures other than ServerProcedureInterface, just pass null if - // so. - removeFromRunQueue(serverRunQueue, - getServerQueue(serverName, - procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure - : null), - () -> procedure + " held exclusive lock"); + ServerProcedureInterface spi = (ServerProcedureInterface) procedure; + if (spi.hasMetaTableRegion()) { + removeFromRunQueue(serverHighPriorityRunQueue, getServerQueue(procedure), + () -> procedure + " held exclusive lock"); + } else { + removeFromRunQueue(serverRunQueue, getServerQueue(procedure), + () -> procedure + " held exclusive lock"); + } return false; } waitProcedure(lock, procedure); @@ -902,12 +954,14 @@ public void wakeServerExclusiveLock(final Procedure procedure, final ServerNa final LockAndQueue lock = locking.getServerLock(serverName); // Only SCP will acquire/release server lock so do not need to check the return value here. lock.releaseExclusiveLock(procedure); - // In tests we may pass procedures other than ServerProcedureInterface, just pass null if - // so. - addToRunQueue(serverRunQueue, - getServerQueue(serverName, - procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure - : null), () -> procedure + " released exclusive lock"); + ServerProcedureInterface spi = (ServerProcedureInterface) procedure; + if (spi.hasMetaTableRegion()) { + addToRunQueue(serverHighPriorityRunQueue, getServerQueue(procedure), + () -> procedure + " released exclusive lock"); + } else { + addToRunQueue(serverRunQueue, getServerQueue(procedure), + () -> procedure + " released exclusive lock"); + } int waitingCount = wakeWaitingProcedures(lock); wakePollIfNeeded(waitingCount); } finally { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java index 03c6dfb5cd27..4caca5402c8f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -940,6 +940,32 @@ public PeerOperationType getPeerOperationType() { } } + public static class TestServerProcedure extends TestProcedure implements ServerProcedureInterface { + private final ServerName serverName; + private final boolean carryingMeta; + + public TestServerProcedure(long procId, boolean carryingMeta, ServerName serverName) { + super(procId); + this.carryingMeta = carryingMeta; + this.serverName = serverName; + } + + @Override + public ServerName getServerName() { + return serverName; + } + + @Override + public boolean hasMetaTableRegion() { + return carryingMeta; + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.CRASH_HANDLER; + } + } + private static LockProcedure createLockProcedure(LockType lockType, long procId) throws Exception { LockProcedure procedure = new LockProcedure(); @@ -982,8 +1008,9 @@ private static void assertSharedLock(LockedResource resource, int lockCount) { @Test public void testListLocksServer() throws Exception { - LockProcedure procedure = createExclusiveLockProcedure(0); - queue.waitServerExclusiveLock(procedure, ServerName.valueOf("server1,1234,0")); + ServerName serverName = ServerName.valueOf("server1,1234,0"); + TestServerProcedure procedure = new TestServerProcedure(0, false, serverName); + queue.waitServerExclusiveLock(procedure, serverName); List resources = queue.getLocks(); assertEquals(1, resources.size()); @@ -1152,4 +1179,56 @@ public void testAcquireSharedLockWhileParentHoldingExclusiveLock() { queue.wakeRegion(proc, regionInfo); queue.wakeTableExclusiveLock(parentProc, tableName); } + + @Test + public void testHighPriorityProcedure() { + ServerName serverName = ServerName.valueOf("server1,1234,0"); + ServerName serverName2 = ServerName.valueOf("server2,1234,0"); + TestServerProcedure highPriorityProcedure = new TestServerProcedure(0, true, serverName); + TestServerProcedure normalProcedure = new TestServerProcedure(0, false, serverName2); + queue.addFront(normalProcedure); + queue.addFront(highPriorityProcedure); + //test pollHighPriority + assertEquals(2, queue.queueSize()); + assertSame(highPriorityProcedure, queue.pollHighPriority()); + assertEquals(1, queue.queueSize()); + assertSame(null, queue.pollHighPriority()); + + assertSame(normalProcedure, queue.poll()); + assertEquals(0, queue.queueSize()); + assertFalse(queue.queueHasRunnables()); + + //test poll + queue.addFront(normalProcedure); + queue.addFront(highPriorityProcedure); + assertSame(highPriorityProcedure, queue.poll()); + assertSame(normalProcedure, queue.poll()); + assertEquals(0, queue.queueSize()); + assertFalse(queue.queueHasRunnables()); + + queue.addFront(normalProcedure); + queue.addFront(highPriorityProcedure); + assertEquals(2, queue.queueSize()); + assertTrue(queue.queueHasRunnables()); + queue.clear(); + assertEquals(0, queue.queueSize()); + assertFalse(queue.queueHasRunnables()); + + queue.addFront(normalProcedure); + queue.addFront(highPriorityProcedure); + assertEquals(2, queue.queueSize()); + assertTrue(queue.queueHasRunnables()); + + TestServerProcedure proc1 = (TestServerProcedure) queue.poll(); + queue.waitServerExclusiveLock(proc1, proc1.getServerName()); + queue.wakeServerExclusiveLock(proc1, proc1.getServerName()); + queue.completionCleanup(proc1); + + TestServerProcedure proc2 = (TestServerProcedure) queue.poll(); + queue.waitServerExclusiveLock(proc2, proc2.getServerName()); + queue.wakeServerExclusiveLock(proc2, proc2.getServerName()); + queue.completionCleanup(proc2); + assertEquals(0, queue.queueSize()); + assertFalse(queue.queueHasRunnables()); + } }