Skip to content

Commit

Permalink
use global threadpool to send msg
Browse files Browse the repository at this point in the history
  • Loading branch information
tynan.liu committed Jun 28, 2022
1 parent 5a28fbb commit 5a50413
Showing 1 changed file with 17 additions and 3 deletions.
20 changes: 17 additions & 3 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import com.alipay.sofa.jraft.util.*;
import com.alipay.sofa.jraft.util.concurrent.DefaultFixedThreadsExecutorGroupFactory;
import com.alipay.sofa.jraft.util.concurrent.FixedThreadsExecutorGroup;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -205,6 +206,8 @@ public class NodeImpl implements Node, RaftServerService {
private volatile int targetPriority;
/** The number of elections time out for current node */
private volatile int electionTimeoutCounter;
private static volatile FixedThreadsExecutorGroup appendEntriesExecutors;
private static final Object INIT_LOCK = new Object();

private static class NodeReadWriteLock extends LongHeldDetectingReadWriteLock {

Expand Down Expand Up @@ -869,6 +872,19 @@ private int randomTimeout(final int timeoutMs) {
return ThreadLocalRandom.current().nextInt(timeoutMs, timeoutMs + this.raftOptions.getMaxElectionDelayMs());
}

private FixedThreadsExecutorGroup getDefaultThreadPoolForSendMsg() {
if (appendEntriesExecutors == null) {
synchronized (INIT_LOCK) {
if (appendEntriesExecutors == null) {
appendEntriesExecutors = DefaultFixedThreadsExecutorGroupFactory.INSTANCE.newExecutorGroup(
Utils.APPEND_ENTRIES_THREADS_SEND, "Append-Entries-Thread-Send",
Utils.MAX_APPEND_ENTRIES_TASKS_PER_THREAD, true);
}
}
}
return appendEntriesExecutors;
}

@Override
public boolean init(final NodeOptions opts) {
Requires.requireNonNull(opts, "Null node options");
Expand All @@ -892,9 +908,7 @@ public boolean init(final NodeOptions opts) {
}

if (this.options.getAppendEntriesExecutors() == null) {
this.options.setAppendEntriesExecutors(DefaultFixedThreadsExecutorGroupFactory.INSTANCE.newExecutorGroup(
Utils.APPEND_ENTRIES_THREADS_SEND, "Append-Entries-Thread-Send",
Utils.MAX_APPEND_ENTRIES_TASKS_PER_THREAD, true));
this.options.setAppendEntriesExecutors(getDefaultThreadPoolForSendMsg());
}

ThreadPoolGroup.registerThreadPool(this.metrics.getMetricRegistry(), this.groupId,
Expand Down

0 comments on commit 5a50413

Please sign in to comment.