From aeed6f50d0a3fdc108f8685ee7757a5c273dbcf3 Mon Sep 17 00:00:00 2001 From: Bo Cui Date: Wed, 2 Sep 2020 17:25:16 +0800 Subject: [PATCH] HBASE-24962 Optimize BufferNode Lock --- .../procedure2/RemoteProcedureDispatcher.java | 67 ++++++++++++------- 1 file changed, 44 insertions(+), 23 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java index 6da95fa58a6c..759dd8dd38c7 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -298,6 +299,8 @@ protected List fetchType( // ============================================================================================ private final class TimeoutExecutorThread extends Thread { private final DelayQueue queue = new DelayQueue(); + private final ConcurrentHashMap pendingBufferNode = + new ConcurrentHashMap<>(); public TimeoutExecutorThread() { super("ProcedureDispatcherTimeoutThread"); @@ -314,17 +317,26 @@ public void run() { if (task instanceof DelayedTask) { threadPool.execute(((DelayedTask) task).getObject()); } else { + pendingBufferNode.remove(task); ((BufferNode) task).dispatch(); } } } + private void putIfAbsent(BufferNode bufferNode) { + if (pendingBufferNode.putIfAbsent(bufferNode, bufferNode) == null) { + bufferNode.setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay); + queue.add(bufferNode); + } + } + public void add(final DelayedWithTimeout delayed) { queue.add(delayed); } public void remove(final DelayedWithTimeout delayed) { queue.remove(delayed); + pendingBufferNode.remove(delayed); } public void sendStopSignal() { @@ -357,8 +369,9 @@ public void awaitTermination() { */ protected final class BufferNode extends DelayedContainerWithTimestamp implements RemoteNode { - private Set operations; - private final Set dispatchedOperations = new HashSet<>(); + private Set operations = Sets.newConcurrentHashSet(); + private final Set dispatchedOperations = Sets.newConcurrentHashSet(); + private final Object lock = new Object(); protected BufferNode(final TRemote key) { super(key, 0); @@ -370,39 +383,47 @@ public TRemote getKey() { } @Override - public synchronized void add(final RemoteProcedure operation) { - if (this.operations == null) { - this.operations = new HashSet<>(); - setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay); - timeoutExecutor.add(this); - } + public void add(final RemoteProcedure operation) { this.operations.add(operation); if (this.operations.size() > queueMaxSize) { - timeoutExecutor.remove(this); - dispatch(); + synchronized (lock) { + if (this.operations.size() > queueMaxSize) { + timeoutExecutor.remove(this); + dispatch(); + } + //all procedure have been scheduled by the current thread or another thread. + return; + } } + timeoutExecutor.putIfAbsent(this); } @Override - public synchronized void dispatch() { - if (operations != null) { - remoteDispatch(getKey(), operations); - operations.stream().filter(operation -> operation.storeInDispatchedQueue()) - .forEach(operation -> dispatchedOperations.add(operation)); - this.operations = null; + public void dispatch() { + Set operationsTmp = operations; + operations = Sets.newConcurrentHashSet(); + if (operationsTmp.isEmpty()) { + return; } + remoteDispatch(getKey(), operations); + operations.stream().filter(RemoteProcedure::storeInDispatchedQueue) + .forEach(dispatchedOperations::add); } - public synchronized void abortOperationsInQueue() { - if (operations != null) { - abortPendingOperations(getKey(), operations); - this.operations = null; + public void abortOperationsInQueue() { + synchronized (lock) { + if (!operations.isEmpty()) { + abortPendingOperations(getKey(), operations); + operations = Sets.newConcurrentHashSet(); + } + if (!dispatchedOperations.isEmpty()) { + abortPendingOperations(getKey(), dispatchedOperations); + this.dispatchedOperations.clear(); + } } - abortPendingOperations(getKey(), dispatchedOperations); - this.dispatchedOperations.clear(); } - public synchronized void operationCompleted(final RemoteProcedure remoteProcedure){ + public void operationCompleted(final RemoteProcedure remoteProcedure){ this.dispatchedOperations.remove(remoteProcedure); }