Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-24962 Optimize BufferNode Lock #2343

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -298,6 +299,8 @@ protected <T extends RemoteOperation> List<T> fetchType(
// ============================================================================================
private final class TimeoutExecutorThread extends Thread {
private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>();
private final ConcurrentHashMap<DelayedWithTimeout, DelayedWithTimeout> pendingBufferNode =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please use ConcurrentMap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?why use concurrentMap

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general rule, it's good to handle implementation by using interface to avoid the pain of changing all references if we wish to change the implementation. I understand, we don't want to change implementation here, but it's always great to refer to implementation using interface unless we want to deliberately use implementor methods which are not overridden from interface.
Hence, just like why we would want to use Map<Key,Val> map=new HashMap<>, let's also use:

private final ConcurrentMap<DelayedWithTimeout, DelayedWithTimeout> pendingBufferNode = new ConcurrentHashMap<>();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx @virajjasani
yes, but i think it is okay to use either, RemoteProcedureDispatcher doesn't change often

new ConcurrentHashMap<>();

public TimeoutExecutorThread() {
super("ProcedureDispatcherTimeoutThread");
Expand All @@ -314,17 +317,26 @@ public void run() {
if (task instanceof DelayedTask) {
threadPool.execute(((DelayedTask) task).getObject());
} else {
pendingBufferNode.remove(task);
((BufferNode) task).dispatch();
}
}
}

private void putIfAbsent(BufferNode bufferNode) {
if (pendingBufferNode.putIfAbsent(bufferNode, bufferNode) == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to add same object as key and value, why are we using ConcurrentHashMap instead of Sets.newConcurrentHashSet()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use atomically putIfAbsent.

bufferNode.setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay);
queue.add(bufferNode);
}
}

public void add(final DelayedWithTimeout delayed) {
queue.add(delayed);
}

public void remove(final DelayedWithTimeout delayed) {
queue.remove(delayed);
pendingBufferNode.remove(delayed);
}

public void sendStopSignal() {
Expand Down Expand Up @@ -357,8 +369,9 @@ public void awaitTermination() {
*/
protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote>
implements RemoteNode<TEnv, TRemote> {
private Set<RemoteProcedure> operations;
private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>();
private Set<RemoteProcedure> operations = Sets.newConcurrentHashSet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RemoteProcedure is parameterized class and it is used in raw format. Although not for this PR, but we should really change this notion of using the class as parameterized in this entire class.

e.g

private Set<RemoteProcedure<TEnv, TRemote>> operations ...
public void add(final RemoteProcedure<TEnv, TRemote> operation) ...
protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure<TEnv, TRemote>> operations);

Nothing urgent, can be taken up in separate patch to well distinguish the commit purpose.

private final Set<RemoteProcedure> dispatchedOperations = Sets.newConcurrentHashSet();
private final Object lock = new Object();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this looks promising but add(RemoteProcedure operation) and abortOperationsInQueue() will now contend with each other. Do we expect both to synchronize on the same object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, but abortOperationsInQueue does't happen very often


protected BufferNode(final TRemote key) {
super(key, 0);
Expand All @@ -370,39 +383,47 @@ public TRemote getKey() {
}

@Override
public synchronized void add(final RemoteProcedure operation) {
if (this.operations == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure we are matching this condition with timeoutExecutor.putIfAbsent(this);?
Based on putIfAbsent() method, it seems only first time when entry is added for new key in map i.e when concurrent map's thread-safe version of putIfAbsent() returns null, we execute this block but is there any scenario that might not adhere to this if (this.operations == null) condition by any chance?

Copy link
Contributor Author

@cuibo01 cuibo01 Sep 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimeoutExecutorThread#run will remove task from pendingBufferNode, so after run, task will not be included in the pendingBufferNode

else {
pendingBufferNode.remove(task);
((BufferNode) task).dispatch();
}

this.operations = new HashSet<>();
setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay);
timeoutExecutor.add(this);
}
public void add(final RemoteProcedure operation) {
this.operations.add(operation);
if (this.operations.size() > queueMaxSize) {
timeoutExecutor.remove(this);
dispatch();
synchronized (lock) {
if (this.operations.size() > queueMaxSize) {
timeoutExecutor.remove(this);
dispatch();
}
//all procedure have been scheduled by the current thread or another thread.
return;
}
}
timeoutExecutor.putIfAbsent(this);
}

@Override
public synchronized void dispatch() {
if (operations != null) {
remoteDispatch(getKey(), operations);
operations.stream().filter(operation -> operation.storeInDispatchedQueue())
.forEach(operation -> dispatchedOperations.add(operation));
this.operations = null;
public void dispatch() {
Set<RemoteProcedure> operationsTmp = operations;
operations = Sets.newConcurrentHashSet();
if (operationsTmp.isEmpty()) {
return;
}
remoteDispatch(getKey(), operations);
operations.stream().filter(RemoteProcedure::storeInDispatchedQueue)
.forEach(dispatchedOperations::add);
}

public synchronized void abortOperationsInQueue() {
if (operations != null) {
abortPendingOperations(getKey(), operations);
this.operations = null;
public void abortOperationsInQueue() {
synchronized (lock) {
if (!operations.isEmpty()) {
abortPendingOperations(getKey(), operations);
operations = Sets.newConcurrentHashSet();
}
if (!dispatchedOperations.isEmpty()) {
abortPendingOperations(getKey(), dispatchedOperations);
this.dispatchedOperations.clear();
}
}
abortPendingOperations(getKey(), dispatchedOperations);
this.dispatchedOperations.clear();
}

public synchronized void operationCompleted(final RemoteProcedure remoteProcedure){
public void operationCompleted(final RemoteProcedure remoteProcedure){
this.dispatchedOperations.remove(remoteProcedure);
}

Expand Down