Skip to content

Commit

Permalink
HBASE-24962 Optimize BufferNode Lock
Browse files Browse the repository at this point in the history
  • Loading branch information
cuibo01 committed Sep 4, 2020
1 parent 85a174f commit aeed6f5
Showing 1 changed file with 44 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -298,6 +299,8 @@ protected <T extends RemoteOperation> List<T> fetchType(
// ============================================================================================
private final class TimeoutExecutorThread extends Thread {
private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>();
private final ConcurrentHashMap<DelayedWithTimeout, DelayedWithTimeout> pendingBufferNode =
new ConcurrentHashMap<>();

public TimeoutExecutorThread() {
super("ProcedureDispatcherTimeoutThread");
Expand All @@ -314,17 +317,26 @@ public void run() {
if (task instanceof DelayedTask) {
threadPool.execute(((DelayedTask) task).getObject());
} else {
pendingBufferNode.remove(task);
((BufferNode) task).dispatch();
}
}
}

private void putIfAbsent(BufferNode bufferNode) {
if (pendingBufferNode.putIfAbsent(bufferNode, bufferNode) == null) {
bufferNode.setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay);
queue.add(bufferNode);
}
}

public void add(final DelayedWithTimeout delayed) {
queue.add(delayed);
}

public void remove(final DelayedWithTimeout delayed) {
queue.remove(delayed);
pendingBufferNode.remove(delayed);
}

public void sendStopSignal() {
Expand Down Expand Up @@ -357,8 +369,9 @@ public void awaitTermination() {
*/
protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote>
implements RemoteNode<TEnv, TRemote> {
private Set<RemoteProcedure> operations;
private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>();
private Set<RemoteProcedure> operations = Sets.newConcurrentHashSet();
private final Set<RemoteProcedure> dispatchedOperations = Sets.newConcurrentHashSet();
private final Object lock = new Object();

protected BufferNode(final TRemote key) {
super(key, 0);
Expand All @@ -370,39 +383,47 @@ public TRemote getKey() {
}

@Override
public synchronized void add(final RemoteProcedure operation) {
if (this.operations == null) {
this.operations = new HashSet<>();
setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay);
timeoutExecutor.add(this);
}
public void add(final RemoteProcedure operation) {
this.operations.add(operation);
if (this.operations.size() > queueMaxSize) {
timeoutExecutor.remove(this);
dispatch();
synchronized (lock) {
if (this.operations.size() > queueMaxSize) {
timeoutExecutor.remove(this);
dispatch();
}
//all procedure have been scheduled by the current thread or another thread.
return;
}
}
timeoutExecutor.putIfAbsent(this);
}

@Override
public synchronized void dispatch() {
if (operations != null) {
remoteDispatch(getKey(), operations);
operations.stream().filter(operation -> operation.storeInDispatchedQueue())
.forEach(operation -> dispatchedOperations.add(operation));
this.operations = null;
public void dispatch() {
Set<RemoteProcedure> operationsTmp = operations;
operations = Sets.newConcurrentHashSet();
if (operationsTmp.isEmpty()) {
return;
}
remoteDispatch(getKey(), operations);
operations.stream().filter(RemoteProcedure::storeInDispatchedQueue)
.forEach(dispatchedOperations::add);
}

public synchronized void abortOperationsInQueue() {
if (operations != null) {
abortPendingOperations(getKey(), operations);
this.operations = null;
public void abortOperationsInQueue() {
synchronized (lock) {
if (!operations.isEmpty()) {
abortPendingOperations(getKey(), operations);
operations = Sets.newConcurrentHashSet();
}
if (!dispatchedOperations.isEmpty()) {
abortPendingOperations(getKey(), dispatchedOperations);
this.dispatchedOperations.clear();
}
}
abortPendingOperations(getKey(), dispatchedOperations);
this.dispatchedOperations.clear();
}

public synchronized void operationCompleted(final RemoteProcedure remoteProcedure){
public void operationCompleted(final RemoteProcedure remoteProcedure){
this.dispatchedOperations.remove(remoteProcedure);
}

Expand Down

0 comments on commit aeed6f5

Please sign in to comment.