Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

custom thread pool #855

Merged
merged 31 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
93b85f2
custom thread pool, todo: DefaultFixedThreadsExecutorGroupFactory.toS…
Jun 23, 2022
9eb4471
format
Jun 23, 2022
9ffe3f4
refactor
Jun 24, 2022
f176f78
add ThreadGroup
Jun 24, 2022
01a36ab
add ThreadGroup
Jun 24, 2022
852185d
Utils.runInThread
shihuili1218 Jun 24, 2022
3a04f14
Utils.runClosureInThread(com.alipay.sofa.jraft.Closure, com.alipay.so…
shihuili1218 Jun 24, 2022
ba5981e
Utils.runClosureInThread(com.alipay.sofa.jraft.Closure)
shihuili1218 Jun 24, 2022
a834d38
format
shihuili1218 Jun 24, 2022
2a8d793
unit test
shihuili1218 Jun 24, 2022
35f74d6
unit test
shihuili1218 Jun 25, 2022
afe98e0
unit test
shihuili1218 Jun 25, 2022
ea0f46b
unit test
shihuili1218 Jun 25, 2022
d8df725
unit test
shihuili1218 Jun 25, 2022
0dbc896
unit test
shihuili1218 Jun 26, 2022
a76e0d6
format
shihuili1218 Jun 26, 2022
5dac633
format
shihuili1218 Jun 26, 2022
241175c
format
shihuili1218 Jun 26, 2022
78983cb
use global threadpool to send msg
Jun 28, 2022
94e35ef
use global threadpool to send msg
Jun 28, 2022
b9971ce
format
Jun 23, 2022
5a28fbb
unit test
shihuili1218 Jun 24, 2022
5a50413
use global threadpool to send msg
Jun 28, 2022
7a2b2c3
Merge branch '20220623threadpool' of https://github.com/farawayliu/so…
Jun 28, 2022
a69d1da
refactor bad smell
Jun 28, 2022
876b3cb
refactor bad smell
Jun 29, 2022
9b171c2
fix groupid is null
Jun 29, 2022
37b423b
fix groupid is null
Jun 29, 2022
11d29ca
NodeTest.assertReadIndex try-finally
Jun 29, 2022
da42eb9
fix bad smell, imports layout
Jun 30, 2022
ac6e34e
format code and fix bad smell
shihuili1218 Jul 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static RaftTimerFactory raftTimerFactory() {
* @return true if bootstrap success
*/
public static boolean bootstrap(final BootstrapOptions opts) throws InterruptedException {
final NodeImpl node = new NodeImpl();
final NodeImpl node = new NodeImpl(opts.getGroupId(), null);
shihuili1218 marked this conversation as resolved.
Show resolved Hide resolved
final boolean ret = node.bootstrap(opts);
node.shutdown();
node.join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ default boolean addReplicator(final PeerId peer, ReplicatorType replicatorType)
/**
* Wait the peer catchup.
*/
boolean waitCaughtUp(final PeerId peer, final long maxMargin, final long dueTime, final CatchUpClosure done);
boolean waitCaughtUp(final String groupId, final PeerId peer, final long maxMargin, final long dueTime, final CatchUpClosure done);

/**
* Get peer's last rpc send timestamp (monotonic time in milliseconds).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.Utils;
import com.alipay.sofa.jraft.util.ThreadPoolsFactory;

/**
* Closure queue implementation.
Expand All @@ -42,6 +42,7 @@ public class ClosureQueueImpl implements ClosureQueue {

private static final Logger LOG = LoggerFactory.getLogger(ClosureQueueImpl.class);

private String groupId;
private final Lock lock;
private long firstIndex;
private LinkedList<Closure> queue;
Expand All @@ -63,6 +64,11 @@ public ClosureQueueImpl() {
this.queue = new LinkedList<>();
}

public ClosureQueueImpl(final String groupId) {
shihuili1218 marked this conversation as resolved.
Show resolved Hide resolved
this();
this.groupId = groupId;
}

@Override
public void clear() {
List<Closure> savedQueue;
Expand All @@ -76,7 +82,7 @@ public void clear() {
}

final Status status = new Status(RaftError.EPERM, "Leader stepped down");
Utils.runInThread(() -> {
ThreadPoolsFactory.runInThread(this.groupId, () -> {
for (final Closure done : savedQueue) {
if (done != null) {
done.run(status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.ThreadPoolsFactory;
import com.alipay.sofa.jraft.util.Utils;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
Expand Down Expand Up @@ -222,7 +223,8 @@ public synchronized void shutdown() {
if (this.taskQueue != null) {
final CountDownLatch latch = new CountDownLatch(1);
this.shutdownLatch = latch;
Utils.runInThread(() -> this.taskQueue.publishEvent((task, sequence) -> {

ThreadPoolsFactory.runInThread(getNode().getGroupId(), () -> this.taskQueue.publishEvent((task, sequence) -> {
task.reset();
task.type = TaskType.SHUTDOWN;
task.shutdownLatch = latch;
Expand Down Expand Up @@ -468,6 +470,7 @@ public boolean isRunningOnFSMThread() {
return Thread.currentThread() == fsmThread;
}

@Override
public void onSnapshotSaveSync(SaveSnapshotClosure done) {
ApplyTask task = new ApplyTask();
task.type = TaskType.SNAPSHOT_SAVE;
Expand Down Expand Up @@ -595,8 +598,8 @@ private void doSnapshotSave(final SaveSnapshotClosure done) {
final ConfigurationEntry confEntry = this.logManager.getConfiguration(lastAppliedIndex);
if (confEntry == null || confEntry.isEmpty()) {
LOG.error("Empty conf entry for lastAppliedIndex={}", lastAppliedIndex);
Utils.runClosureInThread(done, new Status(RaftError.EINVAL, "Empty conf entry for lastAppliedIndex=%s",
lastAppliedIndex));
ThreadPoolsFactory.runClosureInThread(getNode().getGroupId(), done, new Status(RaftError.EINVAL,
"Empty conf entry for lastAppliedIndex=%s", lastAppliedIndex));
return;
}
for (final PeerId peer : confEntry.getConf()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.concurrent.atomic.AtomicLong;

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.StateMachine;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.EnumOutter;
import com.alipay.sofa.jraft.entity.LogEntry;
Expand All @@ -29,6 +28,7 @@
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.storage.LogManager;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.ThreadPoolsFactory;
import com.alipay.sofa.jraft.util.Utils;

/**
Expand Down Expand Up @@ -134,7 +134,7 @@ protected void runTheRestClosureWithError() {
Requires.requireNonNull(this.error, "error");
Requires.requireNonNull(this.error.getStatus(), "error.status");
final Status status = this.error.getStatus();
Utils.runClosureInThread(done, status);
ThreadPoolsFactory.runClosureInThread(this.fsmCaller.getNode().getGroupId(), done, status);
}
}
}
Expand Down
60 changes: 36 additions & 24 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import com.alipay.sofa.jraft.util.SignalHelper;
import com.alipay.sofa.jraft.util.SystemPropertyUtil;
import com.alipay.sofa.jraft.util.ThreadId;
import com.alipay.sofa.jraft.util.ThreadPoolsFactory;
import com.alipay.sofa.jraft.util.Utils;
import com.alipay.sofa.jraft.util.concurrent.LongHeldDetectingReadWriteLock;
import com.alipay.sofa.jraft.util.timer.RaftTimerFactory;
Expand Down Expand Up @@ -363,13 +364,15 @@ public ConfigurationCtx(final NodeImpl node) {
void start(final Configuration oldConf, final Configuration newConf, final Closure done) {
if (isBusy()) {
if (done != null) {
Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Already in busy stage."));
ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), done, new Status(RaftError.EBUSY,
"Already in busy stage."));
}
throw new IllegalStateException("Busy stage");
}
if (this.done != null) {
if (done != null) {
Utils.runClosureInThread(done, new Status(RaftError.EINVAL, "Already have done closure."));
ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), done, new Status(RaftError.EINVAL,
"Already have done closure."));
}
throw new IllegalArgumentException("Already have done closure");
}
Expand Down Expand Up @@ -403,8 +406,8 @@ private void addNewPeers(final Configuration adding) {
}
final OnCaughtUp caughtUp = new OnCaughtUp(this.node, this.node.currTerm, newPeer, this.version);
final long dueTime = Utils.nowMs() + this.node.options.getElectionTimeoutMs();
if (!this.node.replicatorGroup.waitCaughtUp(newPeer, this.node.options.getCatchupMargin(), dueTime,
caughtUp)) {
if (!this.node.replicatorGroup.waitCaughtUp(this.node.getGroupId(), newPeer,
this.node.options.getCatchupMargin(), dueTime, caughtUp)) {
LOG.error("Node {} waitCaughtUp, peer={}.", this.node.getNodeId(), newPeer);
onCaughtUp(this.version, newPeer, false);
return;
Expand Down Expand Up @@ -463,8 +466,8 @@ void reset(final Status st) {
this.stage = Stage.STAGE_NONE;
this.nchanges = 0;
if (this.done != null) {
Utils.runClosureInThread(this.done, st != null ? st : new Status(RaftError.EPERM,
"Leader stepped down."));
ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), this.done, st != null ? st : new Status(
RaftError.EPERM, "Leader stepped down."));
this.done = null;
}
}
Expand Down Expand Up @@ -577,6 +580,7 @@ private boolean initLogStorage() {
this.logStorage = this.serviceFactory.createLogStorage(this.options.getLogUri(), this.raftOptions);
this.logManager = new LogManagerImpl();
final LogManagerOptions opts = new LogManagerOptions();
opts.setGroupId(this.groupId);
opts.setLogEntryCodecFactory(this.serviceFactory.createLogEntryCodecFactory());
opts.setLogStorage(this.logStorage);
opts.setConfigurationManager(this.configManager);
Expand Down Expand Up @@ -610,7 +614,7 @@ private void handleSnapshotTimeout() {
this.writeLock.unlock();
}
// do_snapshot in another thread to avoid blocking the timer thread.
Utils.runInThread(() -> doSnapshot(null, false));
ThreadPoolsFactory.runInThread(this.groupId, () -> doSnapshot(null, false));
}

private void handleElectionTimeout() {
Expand Down Expand Up @@ -754,7 +758,7 @@ private boolean initFSMCaller(final LogId bootstrapId) {
LOG.error("Fail to init fsm caller, null instance, bootstrapId={}.", bootstrapId);
return false;
}
this.closureQueue = new ClosureQueueImpl();
this.closureQueue = new ClosureQueueImpl(this.groupId);
final FSMCallerOptions opts = new FSMCallerOptions();
opts.setAfterShutdown(status -> afterShutdown());
opts.setLogManager(this.logManager);
Expand Down Expand Up @@ -906,6 +910,10 @@ public boolean init(final NodeOptions opts) {
return false;
}

if (this.options.getAppendEntriesExecutors() == null) {
this.options.setAppendEntriesExecutors(Utils.getDefaultAppendEntriesExecutor());
}

this.timerManager = TIMER_FACTORY.getRaftScheduler(this.options.isSharedTimerPool(),
this.options.getTimerPoolSize(), "JRaft-Node-ScheduleThreadPool");

Expand Down Expand Up @@ -1044,7 +1052,7 @@ protected int adjustTimeout(final int timeoutMs) {

// TODO RPC service and ReplicatorGroup is in cycle dependent, refactor it
this.replicatorGroup = new ReplicatorGroupImpl();
this.rpcService = new DefaultRaftClientService(this.replicatorGroup);
this.rpcService = new DefaultRaftClientService(this.replicatorGroup, this.options.getAppendEntriesExecutors());
final ReplicatorGroupOptions rgOpts = new ReplicatorGroupOptions();
rgOpts.setHeartbeatTimeoutMs(heartbeatTimeout(this.options.getElectionTimeoutMs()));
rgOpts.setElectionTimeoutMs(this.options.getElectionTimeoutMs());
Expand Down Expand Up @@ -1365,7 +1373,7 @@ private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
LOG.debug("Node {} can't apply, status={}.", getNodeId(), st);
final List<Closure> dones = tasks.stream().map(ele -> ele.done)
.filter(Objects::nonNull).collect(Collectors.toList());
Utils.runInThread(() -> {
ThreadPoolsFactory.runInThread(this.groupId, () -> {
for (final Closure done : dones) {
done.run(st);
}
Expand All @@ -1381,14 +1389,14 @@ private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
if (task.done != null) {
final Status st = new Status(RaftError.EPERM, "expected_term=%d doesn't match current_term=%d",
task.expectedTerm, this.currTerm);
Utils.runClosureInThread(task.done, st);
ThreadPoolsFactory.runClosureInThread(this.groupId, task.done, st);
task.reset();
}
continue;
}
if (!this.ballotBox.appendPendingTask(this.conf.getConf(),
this.conf.isStable() ? null : this.conf.getOldConf(), task.done)) {
Utils.runClosureInThread(task.done, new Status(RaftError.EINTERNAL, "Fail to append task."));
ThreadPoolsFactory.runClosureInThread(this.groupId, task.done, new Status(RaftError.EINTERNAL, "Fail to append task."));
task.reset();
continue;
}
Expand Down Expand Up @@ -1428,7 +1436,8 @@ public JRaftServiceFactory getServiceFactory() {
@Override
public void readIndex(final byte[] requestContext, final ReadIndexClosure done) {
if (this.shutdownLatch != null) {
Utils.runClosureInThread(done, new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
ThreadPoolsFactory.runClosureInThread(this.groupId, done, new Status(RaftError.ENODESHUTDOWN,
"Node is shutting down."));
throw new IllegalStateException("Node is shutting down");
}
Requires.requireNonNull(done, "Null closure");
Expand Down Expand Up @@ -1603,7 +1612,7 @@ private void readLeader(final ReadIndexRequest request, final ReadIndexResponse.
@Override
public void apply(final Task task) {
if (this.shutdownLatch != null) {
Utils.runClosureInThread(task.getDone(), new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
ThreadPoolsFactory.runClosureInThread(this.groupId, task.getDone(), new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
throw new IllegalStateException("Node is shutting down");
}
Requires.requireNonNull(task, "Null task");
Expand All @@ -1626,7 +1635,7 @@ public void apply(final Task task) {
default:
if (!this.applyQueue.tryPublishEvent(translator)) {
String errorMsg = "Node is busy, has too many tasks, queue is full and bufferSize="+ this.applyQueue.getBufferSize();
Utils.runClosureInThread(task.getDone(),
ThreadPoolsFactory.runClosureInThread(this.groupId, task.getDone(),
new Status(RaftError.EBUSY, errorMsg));
LOG.warn("Node {} applyQueue is overload.", getNodeId());
this.metrics.recordTimes("apply-task-overload-times", 1);
Expand Down Expand Up @@ -2176,7 +2185,8 @@ private void onCaughtUp(final PeerId peer, final long term, final long version,
LOG.debug("Node {} waits peer {} to catch up.", getNodeId(), peer);
final OnCaughtUp caughtUp = new OnCaughtUp(this, term, peer, version);
final long dueTime = Utils.nowMs() + this.options.getElectionTimeoutMs();
if (this.replicatorGroup.waitCaughtUp(peer, this.options.getCatchupMargin(), dueTime, caughtUp)) {
if (this.replicatorGroup.waitCaughtUp(this.groupId, peer, this.options.getCatchupMargin(), dueTime,
caughtUp)) {
return;
}
LOG.warn("Node {} waitCaughtUp failed, peer={}.", getNodeId(), peer);
Expand Down Expand Up @@ -2344,7 +2354,8 @@ private void unsafeApplyConfiguration(final Configuration newConf, final Configu
final ConfigurationChangeDone configurationChangeDone = new ConfigurationChangeDone(this.currTerm, leaderStart);
// Use the new_conf to deal the quorum of this very log
if (!this.ballotBox.appendPendingTask(newConf, oldConf, configurationChangeDone)) {
Utils.runClosureInThread(configurationChangeDone, new Status(RaftError.EINTERNAL, "Fail to append task."));
ThreadPoolsFactory.runClosureInThread(this.groupId, configurationChangeDone, new Status(
RaftError.EINTERNAL, "Fail to append task."));
return;
}
final List<LogEntry> entries = new ArrayList<>();
Expand All @@ -2369,21 +2380,22 @@ private void unsafeRegisterConfChange(final Configuration oldConf, final Configu
} else {
status.setError(RaftError.EPERM, "Not leader");
}
Utils.runClosureInThread(done, status);
ThreadPoolsFactory.runClosureInThread(this.groupId, done, status);
}
return;
}
// check concurrent conf change
if (this.confCtx.isBusy()) {
LOG.warn("Node {} refused configuration concurrent changing.", getNodeId());
if (done != null) {
Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Doing another configuration change."));
ThreadPoolsFactory.runClosureInThread(this.groupId, done, new Status(RaftError.EBUSY,
"Doing another configuration change."));
}
return;
}
// Return immediately when the new peers equals to current configuration
if (this.conf.getConf().equals(newConf)) {
Utils.runClosureInThread(done, Status.OK());
ThreadPoolsFactory.runClosureInThread(this.groupId, done, Status.OK());
return;
}
this.confCtx.start(oldConf, newConf, done);
Expand All @@ -2405,7 +2417,7 @@ private void afterShutdown() {
}
if (savedDoneList != null) {
for (final Closure closure : savedDoneList) {
Utils.runClosureInThread(closure);
ThreadPoolsFactory.runClosureInThread(this.groupId, closure);
}
}
}
Expand Down Expand Up @@ -2787,7 +2799,7 @@ public void shutdown(Closure done) {
if (this.applyQueue != null) {
final CountDownLatch latch = new CountDownLatch(1);
this.shutdownLatch = latch;
Utils.runInThread(
ThreadPoolsFactory.runInThread(this.groupId,
() -> this.applyQueue.publishEvent((event, sequence) -> event.shutdownLatch = latch));
} else {
final int num = GLOBAL_NUM_NODES.decrementAndGet();
Expand All @@ -2814,7 +2826,7 @@ public void shutdown(Closure done) {
}
// Call join() asynchronously
final Closure shutdownHook = done;
Utils.runInThread(() -> {
ThreadPoolsFactory.runInThread(this.groupId, () -> {
try {
join();
} catch (InterruptedException e) {
Expand Down Expand Up @@ -3148,7 +3160,7 @@ private void doSnapshot(final Closure done, boolean sync) {
} else {
if (done != null) {
final Status status = new Status(RaftError.EINVAL, "Snapshot is not supported");
Utils.runClosureInThread(done, status);
ThreadPoolsFactory.runClosureInThread(this.groupId, done, status);
}
}
}
Expand Down
Loading