Skip to content

Commit

Permalink
HBASE-23597 Give high priority for meta assign procedure and ServerCr…
Browse files Browse the repository at this point in the history
…ashProcedure which carry meta.
  • Loading branch information
binlijin committed Sep 27, 2020
1 parent c312760 commit 123596a
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,34 @@ protected void push(final Procedure procedure, final boolean addFront, final boo
* NOTE: this method is called with the sched lock held.
* @return the Procedure to execute, or null if nothing is available.
*/
protected abstract Procedure dequeue();
protected abstract Procedure dequeue(boolean highPriority);

@Override
public Procedure pollHighPriority() {
return poll(-1, true);
}

@Override
public Procedure pollHighPriority(long timeout, TimeUnit unit) {
return poll(unit.toNanos(timeout), true);
}

@Override
public Procedure poll() {
return poll(-1);
return poll(-1, false);
}

@Override
public Procedure poll(long timeout, TimeUnit unit) {
return poll(unit.toNanos(timeout));
return poll(unit.toNanos(timeout), false);
}

@edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
public Procedure poll(final long nanos) {
return poll(nanos, false);
}

@edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
private Procedure poll(final long nanos, final boolean highPriority) {
schedLock();
try {
if (!running) {
Expand All @@ -174,7 +188,7 @@ public Procedure poll(final long nanos) {
return null;
}
}
final Procedure pollResult = dequeue();
final Procedure pollResult = dequeue(highPriority);

pollCalls++;
nullPollCalls += (pollResult == null) ? 1 : 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1967,13 +1967,14 @@ protected WorkerThread(ThreadGroup group, String prefix) {
public void sendStopSignal() {
scheduler.signalAll();
}

@Override
public void run() {
long lastUpdate = EnvironmentEdgeManager.currentTime();
try {
while (isRunning() && keepAlive(lastUpdate)) {
@SuppressWarnings("unchecked")
Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
Procedure<TEnvironment> proc = getProcedure();
if (proc == null) {
continue;
}
Expand Down Expand Up @@ -2025,6 +2026,10 @@ public long getCurrentRunTime() {
protected boolean keepAlive(long lastUpdate) {
return true;
}

protected Procedure<TEnvironment> getProcedure() {
return scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
}
}

// A worker thread which can be added when core workers are stuck. Will timeout after
Expand All @@ -2040,6 +2045,25 @@ protected boolean keepAlive(long lastUpdate) {
}
}

private final class HighPriorityWorkerThread extends WorkerThread {
private Procedure<TEnvironment> procedure;

public HighPriorityWorkerThread(ThreadGroup group, Procedure<TEnvironment> proc) {
super(group, "HighPriorityPEWorker-");
this.procedure = proc;
}

@Override
protected boolean keepAlive(long lastUpdate) {
return false;
}

@Override
protected Procedure<TEnvironment> getProcedure() {
return procedure;
}
}

// ----------------------------------------------------------------------------
// TODO-MAYBE: Should we provide a InlineChore to notify the store with the
// full set of procedures pending and completed to write a compacted
Expand All @@ -2051,7 +2075,7 @@ protected boolean keepAlive(long lastUpdate) {
private final class WorkerMonitor extends InlineChore {
public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =
"hbase.procedure.worker.monitor.interval.msec";
private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec
private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 1000; // 1sec

public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =
"hbase.procedure.worker.stuck.threshold.msec";
Expand All @@ -2071,13 +2095,36 @@ public WorkerMonitor() {

@Override
public void run() {
// accelerate high priority procedure.
accelerateHighPriority();

final int stuckCount = checkForStuckWorkers();
checkThreadCount(stuckCount);

// refresh interval (poor man dynamic conf update)
refreshConfig();
}

private void accelerateHighPriority() {
if (!scheduler.hasRunnables()) {
return;
}
while (true) {
// Poll a high priority procedure and execute it intermediately
Procedure highPriorityProcedure = scheduler.pollHighPriority(1, TimeUnit.NANOSECONDS);
if (highPriorityProcedure != null) {
final HighPriorityWorkerThread worker =
new HighPriorityWorkerThread(threadGroup, highPriorityProcedure);
workerThreads.add(worker);
worker.start();
LOG.info("Added new HighPriority worker thread {} for highPriorityProcedure {}", worker,
highPriorityProcedure);
} else {
return;
}
}
}

private int checkForStuckWorkers() {
// check if any of the worker is stuck
int stuckCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,20 @@ public interface ProcedureScheduler {
*/
boolean hasRunnables();

/**
* Fetch one high priority Procedure from the queue
* @return the Procedure to execute, or null if nothing present.
*/
Procedure pollHighPriority();

/**
* Fetch one high priority Procedure from the queue
* @param timeout how long to wait before giving up, in units of unit
* @param unit a TimeUnit determining how to interpret the timeout parameter
* @return the Procedure to execute, or null if nothing present.
*/
Procedure pollHighPriority(long timeout, TimeUnit unit);

/**
* Fetch one Procedure from the queue
* @return the Procedure to execute, or null if nothing present.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected void enqueue(final Procedure procedure, final boolean addFront) {
}

@Override
protected Procedure dequeue() {
protected Procedure dequeue(boolean highPriority) {
return runnables.poll();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,16 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
(n, k) -> n.compareKey((String) k);
private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
(n, k) -> n.compareKey((TableName) k);
private static final AvlKeyComparator<ServerQueue> SERVER_HIGHPRIORITY_QUEUE_KEY_COMPARATOR =
(n, k) -> n.compareKey((ServerName) k);

private final FairQueue<ServerName> serverHighPriorityRunQueue = new FairQueue<>();
private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
private final FairQueue<String> peerRunQueue = new FairQueue<>();
private final FairQueue<TableName> metaRunQueue = new FairQueue<>();

private final ServerQueue[] serverHighPriorityBuckets = new ServerQueue[4];
private final ServerQueue[] serverBuckets = new ServerQueue[128];
private TableQueue tableMap = null;
private PeerQueue peerMap = null;
Expand All @@ -135,7 +139,11 @@ protected void enqueue(final Procedure proc, final boolean addFront) {
doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
} else if (isServerProcedure(proc)) {
ServerProcedureInterface spi = (ServerProcedureInterface) proc;
doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
if (spi.hasMetaTableRegion()) {
doAdd(serverHighPriorityRunQueue, getServerQueue(proc), proc, addFront);
} else {
doAdd(serverRunQueue, getServerQueue(proc), proc, addFront);
}
} else if (isPeerProcedure(proc)) {
doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
} else {
Expand Down Expand Up @@ -173,25 +181,30 @@ private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,

@Override
protected boolean queueHasRunnables() {
return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() ||
serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables();
return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() || serverRunQueue
.hasRunnables() || peerRunQueue.hasRunnables() || serverHighPriorityRunQueue.hasRunnables();
}

@Override
protected Procedure dequeue() {
protected Procedure dequeue(boolean highPriority) {
// meta procedure is always the first priority
Procedure<?> pollResult = doPoll(metaRunQueue);
// For now, let server handling have precedence over table handling; presumption is that it
// is more important handling crashed servers than it is running the
// enabling/disabling tables, etc.
if (pollResult == null) {
pollResult = doPoll(serverRunQueue);
}
if (pollResult == null) {
pollResult = doPoll(peerRunQueue);
pollResult = doPoll(serverHighPriorityRunQueue);
}
if (pollResult == null) {
pollResult = doPoll(tableRunQueue);
if (!highPriority) {
if (pollResult == null) {
pollResult = doPoll(serverRunQueue);
}
if (pollResult == null) {
pollResult = doPoll(peerRunQueue);
}
if (pollResult == null) {
pollResult = doPoll(tableRunQueue);
}
}
return pollResult;
}
Expand Down Expand Up @@ -269,6 +282,11 @@ private void clearQueue() {
clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
serverBuckets[i] = null;
}
for (int i = 0; i < serverHighPriorityBuckets.length; ++i) {
clear(serverHighPriorityBuckets[i], serverHighPriorityRunQueue,
SERVER_HIGHPRIORITY_QUEUE_KEY_COMPARATOR);
serverHighPriorityBuckets[i] = null;
}

// Remove Tables
clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
Expand Down Expand Up @@ -307,6 +325,9 @@ protected int queueSize() {
for (ServerQueue serverMap : serverBuckets) {
count += queueSize(serverMap);
}
for (ServerQueue serverMap : serverHighPriorityBuckets) {
count += queueSize(serverMap);
}
count += queueSize(tableMap);
count += queueSize(peerMap);
count += queueSize(metaMap);
Expand Down Expand Up @@ -338,7 +359,7 @@ public void completionCleanup(final Procedure proc) {
} else if (proc instanceof PeerProcedureInterface) {
tryCleanupPeerQueue(getPeerId(proc), proc);
} else if (proc instanceof ServerProcedureInterface) {
tryCleanupServerQueue(getServerName(proc), proc);
tryCleanupServerQueue(proc);
} else {
// No cleanup for other procedure types, yet.
return;
Expand Down Expand Up @@ -391,12 +412,28 @@ private static TableName getTableName(Procedure<?> proc) {
return ((TableProcedureInterface)proc).getTableName();
}

private ServerQueue getServerQueue(Procedure<?> proc) {
if (isServerProcedure(proc)) {
ServerProcedureInterface spi = (ServerProcedureInterface) proc;
if (spi.hasMetaTableRegion()) {
return getServerQueue(serverHighPriorityBuckets, SERVER_HIGHPRIORITY_QUEUE_KEY_COMPARATOR,
spi.getServerName(), spi);
} else {
return getServerQueue(serverBuckets, SERVER_QUEUE_KEY_COMPARATOR, spi.getServerName(), spi);
}
} else {
return null;
}
}

// ============================================================================
// Server Queue Lookup Helpers
// ============================================================================
private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) {
private ServerQueue getServerQueue(ServerQueue[] serverBuckets,
AvlKeyComparator<ServerQueue> keyComparator, ServerName serverName,
ServerProcedureInterface proc) {
final int index = getBucketIndex(serverBuckets, serverName.hashCode());
ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
ServerQueue node = AvlTree.get(serverBuckets[index], serverName, keyComparator);
if (node != null) {
return node;
}
Expand All @@ -411,18 +448,32 @@ private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterfa
return node;
}

private void removeServerQueue(ServerName serverName) {
private void removeServerQueue(ServerQueue[] serverBuckets,
AvlKeyComparator<ServerQueue> keyComparator, ServerName serverName) {
int index = getBucketIndex(serverBuckets, serverName.hashCode());
serverBuckets[index] =
AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
serverBuckets[index] = AvlTree.remove(serverBuckets[index], serverName, keyComparator);
locking.removeServerLock(serverName);
}

private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
private void tryCleanupServerQueue(Procedure<?> proc) {
ServerName serverName = getServerName(proc);
ServerProcedureInterface spi = (ServerProcedureInterface) proc;
if (spi.hasMetaTableRegion()) {
tryCleanupServerQueue(this.serverHighPriorityBuckets, this.serverHighPriorityRunQueue,
SERVER_HIGHPRIORITY_QUEUE_KEY_COMPARATOR, serverName, proc);
} else {
tryCleanupServerQueue(this.serverBuckets, this.serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR,
serverName, proc);
}
}

private void tryCleanupServerQueue(ServerQueue[] serverBuckets,
FairQueue<ServerName> serverRunQueue, AvlKeyComparator<ServerQueue> keyComparator,
ServerName serverName, Procedure<?> proc) {
schedLock();
try {
int index = getBucketIndex(serverBuckets, serverName.hashCode());
ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
ServerQueue node = AvlTree.get(serverBuckets[index], serverName, keyComparator);
if (node == null) {
return;
}
Expand All @@ -431,7 +482,7 @@ private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
removeFromRunQueue(serverRunQueue, node,
() -> "clean up server queue after " + proc + " completed");
removeServerQueue(serverName);
removeServerQueue(serverBuckets, keyComparator, serverName);
}
} finally {
schedUnlock();
Expand Down Expand Up @@ -873,13 +924,14 @@ public boolean waitServerExclusiveLock(final Procedure<?> procedure,
try {
final LockAndQueue lock = locking.getServerLock(serverName);
if (lock.tryExclusiveLock(procedure)) {
// In tests we may pass procedures other than ServerProcedureInterface, just pass null if
// so.
removeFromRunQueue(serverRunQueue,
getServerQueue(serverName,
procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
: null),
() -> procedure + " held exclusive lock");
ServerProcedureInterface spi = (ServerProcedureInterface) procedure;
if (spi.hasMetaTableRegion()) {
removeFromRunQueue(serverHighPriorityRunQueue, getServerQueue(procedure),
() -> procedure + " held exclusive lock");
} else {
removeFromRunQueue(serverRunQueue, getServerQueue(procedure),
() -> procedure + " held exclusive lock");
}
return false;
}
waitProcedure(lock, procedure);
Expand All @@ -902,12 +954,14 @@ public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerNa
final LockAndQueue lock = locking.getServerLock(serverName);
// Only SCP will acquire/release server lock so do not need to check the return value here.
lock.releaseExclusiveLock(procedure);
// In tests we may pass procedures other than ServerProcedureInterface, just pass null if
// so.
addToRunQueue(serverRunQueue,
getServerQueue(serverName,
procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
: null), () -> procedure + " released exclusive lock");
ServerProcedureInterface spi = (ServerProcedureInterface) procedure;
if (spi.hasMetaTableRegion()) {
addToRunQueue(serverHighPriorityRunQueue, getServerQueue(procedure),
() -> procedure + " released exclusive lock");
} else {
addToRunQueue(serverRunQueue, getServerQueue(procedure),
() -> procedure + " released exclusive lock");
}
int waitingCount = wakeWaitingProcedures(lock);
wakePollIfNeeded(waitingCount);
} finally {
Expand Down
Loading

0 comments on commit 123596a

Please sign in to comment.