Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

custom thread pool #855

Merged
merged 31 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
93b85f2
custom thread pool, todo: DefaultFixedThreadsExecutorGroupFactory.toS…
Jun 23, 2022
9eb4471
format
Jun 23, 2022
9ffe3f4
refactor
Jun 24, 2022
f176f78
add ThreadGroup
Jun 24, 2022
01a36ab
add ThreadGroup
Jun 24, 2022
852185d
Utils.runInThread
shihuili1218 Jun 24, 2022
3a04f14
Utils.runClosureInThread(com.alipay.sofa.jraft.Closure, com.alipay.so…
shihuili1218 Jun 24, 2022
ba5981e
Utils.runClosureInThread(com.alipay.sofa.jraft.Closure)
shihuili1218 Jun 24, 2022
a834d38
format
shihuili1218 Jun 24, 2022
2a8d793
unit test
shihuili1218 Jun 24, 2022
35f74d6
unit test
shihuili1218 Jun 25, 2022
afe98e0
unit test
shihuili1218 Jun 25, 2022
ea0f46b
unit test
shihuili1218 Jun 25, 2022
d8df725
unit test
shihuili1218 Jun 25, 2022
0dbc896
unit test
shihuili1218 Jun 26, 2022
a76e0d6
format
shihuili1218 Jun 26, 2022
5dac633
format
shihuili1218 Jun 26, 2022
241175c
format
shihuili1218 Jun 26, 2022
78983cb
use global threadpool to send msg
Jun 28, 2022
94e35ef
use global threadpool to send msg
Jun 28, 2022
b9971ce
format
Jun 23, 2022
5a28fbb
unit test
shihuili1218 Jun 24, 2022
5a50413
use global threadpool to send msg
Jun 28, 2022
7a2b2c3
Merge branch '20220623threadpool' of https://github.com/farawayliu/so…
Jun 28, 2022
a69d1da
refactor bad smell
Jun 28, 2022
876b3cb
refactor bad smell
Jun 29, 2022
9b171c2
fix groupid is null
Jun 29, 2022
37b423b
fix groupid is null
Jun 29, 2022
11d29ca
NodeTest.assertReadIndex try-finally
Jun 29, 2022
da42eb9
fix bad smell, imports layout
Jun 30, 2022
ac6e34e
format code and fix bad smell
shihuili1218 Jul 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ default boolean addReplicator(final PeerId peer, ReplicatorType replicatorType)
/**
* Wait the peer catchup.
*/
boolean waitCaughtUp(final PeerId peer, final long maxMargin, final long dueTime, final CatchUpClosure done);
boolean waitCaughtUp(final String groupId, final PeerId peer, final long maxMargin, final long dueTime, final CatchUpClosure done);

/**
* Get peer's last rpc send timestamp (monotonic time in milliseconds).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.Utils;
import com.alipay.sofa.jraft.util.ThreadPoolsFactory;

/**
* Closure queue implementation.
Expand All @@ -42,6 +42,7 @@ public class ClosureQueueImpl implements ClosureQueue {

private static final Logger LOG = LoggerFactory.getLogger(ClosureQueueImpl.class);

private String groupId;
private final Lock lock;
private long firstIndex;
private LinkedList<Closure> queue;
Expand All @@ -63,6 +64,11 @@ public ClosureQueueImpl() {
this.queue = new LinkedList<>();
}

public ClosureQueueImpl(final String groupId) {
shihuili1218 marked this conversation as resolved.
Show resolved Hide resolved
this();
this.groupId = groupId;
}

@Override
public void clear() {
List<Closure> savedQueue;
Expand All @@ -76,7 +82,7 @@ public void clear() {
}

final Status status = new Status(RaftError.EPERM, "Leader stepped down");
Utils.runInThread(() -> {
ThreadPoolsFactory.runInThread(this.groupId, () -> {
for (final Closure done : savedQueue) {
if (done != null) {
done.run(status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.ThreadPoolsFactory;
import com.alipay.sofa.jraft.util.Utils;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
Expand Down Expand Up @@ -222,7 +223,8 @@ public synchronized void shutdown() {
if (this.taskQueue != null) {
final CountDownLatch latch = new CountDownLatch(1);
this.shutdownLatch = latch;
Utils.runInThread(() -> this.taskQueue.publishEvent((task, sequence) -> {

ThreadPoolsFactory.runInThread(getNode().getGroupId(), () -> this.taskQueue.publishEvent((task, sequence) -> {
task.reset();
task.type = TaskType.SHUTDOWN;
task.shutdownLatch = latch;
Expand Down Expand Up @@ -468,6 +470,7 @@ public boolean isRunningOnFSMThread() {
return Thread.currentThread() == fsmThread;
}

@Override
public void onSnapshotSaveSync(SaveSnapshotClosure done) {
ApplyTask task = new ApplyTask();
task.type = TaskType.SNAPSHOT_SAVE;
Expand Down Expand Up @@ -595,8 +598,8 @@ private void doSnapshotSave(final SaveSnapshotClosure done) {
final ConfigurationEntry confEntry = this.logManager.getConfiguration(lastAppliedIndex);
if (confEntry == null || confEntry.isEmpty()) {
LOG.error("Empty conf entry for lastAppliedIndex={}", lastAppliedIndex);
Utils.runClosureInThread(done, new Status(RaftError.EINVAL, "Empty conf entry for lastAppliedIndex=%s",
lastAppliedIndex));
ThreadPoolsFactory.runClosureInThread(getNode().getGroupId(), done, new Status(RaftError.EINVAL,
"Empty conf entry for lastAppliedIndex=%s", lastAppliedIndex));
return;
}
for (final PeerId peer : confEntry.getConf()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.concurrent.atomic.AtomicLong;

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.StateMachine;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.EnumOutter;
import com.alipay.sofa.jraft.entity.LogEntry;
Expand All @@ -29,6 +28,7 @@
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.storage.LogManager;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.ThreadPoolsFactory;
import com.alipay.sofa.jraft.util.Utils;

/**
Expand Down Expand Up @@ -134,7 +134,7 @@ protected void runTheRestClosureWithError() {
Requires.requireNonNull(this.error, "error");
Requires.requireNonNull(this.error.getStatus(), "error.status");
final Status status = this.error.getStatus();
Utils.runClosureInThread(done, status);
ThreadPoolsFactory.runClosureInThread(this.fsmCaller.getNode().getGroupId(), done, status);
}
}
}
Expand Down
Loading