diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/JRaftUtils.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/JRaftUtils.java index 020137fe2..6335ee1c6 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/JRaftUtils.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/JRaftUtils.java @@ -61,7 +61,7 @@ public static RaftTimerFactory raftTimerFactory() { * @return true if bootstrap success */ public static boolean bootstrap(final BootstrapOptions opts) throws InterruptedException { - final NodeImpl node = new NodeImpl(); + final NodeImpl node = new NodeImpl(opts.getGroupId(), null); final boolean ret = node.bootstrap(opts); node.shutdown(); node.join(); diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/ReplicatorGroup.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/ReplicatorGroup.java index 70541944b..7615ab99d 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/ReplicatorGroup.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/ReplicatorGroup.java @@ -121,7 +121,7 @@ default boolean addReplicator(final PeerId peer, ReplicatorType replicatorType) /** * Wait the peer catchup. */ - boolean waitCaughtUp(final PeerId peer, final long maxMargin, final long dueTime, final CatchUpClosure done); + boolean waitCaughtUp(final String groupId, final PeerId peer, final long maxMargin, final long dueTime, final CatchUpClosure done); /** * Get peer's last rpc send timestamp (monotonic time in milliseconds). diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/closure/ClosureQueueImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/closure/ClosureQueueImpl.java index 10e9ed041..90dff35d9 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/closure/ClosureQueueImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/closure/ClosureQueueImpl.java @@ -29,7 +29,7 @@ import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.util.OnlyForTest; import com.alipay.sofa.jraft.util.Requires; -import com.alipay.sofa.jraft.util.Utils; +import com.alipay.sofa.jraft.util.ThreadPoolsFactory; /** * Closure queue implementation. @@ -42,6 +42,7 @@ public class ClosureQueueImpl implements ClosureQueue { private static final Logger LOG = LoggerFactory.getLogger(ClosureQueueImpl.class); + private String groupId; private final Lock lock; private long firstIndex; private LinkedList queue; @@ -63,6 +64,11 @@ public ClosureQueueImpl() { this.queue = new LinkedList<>(); } + public ClosureQueueImpl(final String groupId) { + this(); + this.groupId = groupId; + } + @Override public void clear() { List savedQueue; @@ -76,7 +82,7 @@ public void clear() { } final Status status = new Status(RaftError.EPERM, "Leader stepped down"); - Utils.runInThread(() -> { + ThreadPoolsFactory.runInThread(this.groupId, () -> { for (final Closure done : savedQueue) { if (done != null) { done.run(status); diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/FSMCallerImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/FSMCallerImpl.java index 60d9753a4..b43c2b4c9 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/FSMCallerImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/FSMCallerImpl.java @@ -53,6 +53,7 @@ import com.alipay.sofa.jraft.util.NamedThreadFactory; import com.alipay.sofa.jraft.util.OnlyForTest; import com.alipay.sofa.jraft.util.Requires; +import com.alipay.sofa.jraft.util.ThreadPoolsFactory; import com.alipay.sofa.jraft.util.Utils; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventFactory; @@ -222,7 +223,8 @@ public synchronized void shutdown() { if (this.taskQueue != null) { final CountDownLatch latch = new CountDownLatch(1); this.shutdownLatch = latch; - Utils.runInThread(() -> this.taskQueue.publishEvent((task, sequence) -> { + + ThreadPoolsFactory.runInThread(getNode().getGroupId(), () -> this.taskQueue.publishEvent((task, sequence) -> { task.reset(); task.type = TaskType.SHUTDOWN; task.shutdownLatch = latch; @@ -468,6 +470,7 @@ public boolean isRunningOnFSMThread() { return Thread.currentThread() == fsmThread; } + @Override public void onSnapshotSaveSync(SaveSnapshotClosure done) { ApplyTask task = new ApplyTask(); task.type = TaskType.SNAPSHOT_SAVE; @@ -595,8 +598,8 @@ private void doSnapshotSave(final SaveSnapshotClosure done) { final ConfigurationEntry confEntry = this.logManager.getConfiguration(lastAppliedIndex); if (confEntry == null || confEntry.isEmpty()) { LOG.error("Empty conf entry for lastAppliedIndex={}", lastAppliedIndex); - Utils.runClosureInThread(done, new Status(RaftError.EINVAL, "Empty conf entry for lastAppliedIndex=%s", - lastAppliedIndex)); + ThreadPoolsFactory.runClosureInThread(getNode().getGroupId(), done, new Status(RaftError.EINVAL, + "Empty conf entry for lastAppliedIndex=%s", lastAppliedIndex)); return; } for (final PeerId peer : confEntry.getConf()) { diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/IteratorImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/IteratorImpl.java index fd69e712f..ce0f92bee 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/IteratorImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/IteratorImpl.java @@ -20,7 +20,6 @@ import java.util.concurrent.atomic.AtomicLong; import com.alipay.sofa.jraft.Closure; -import com.alipay.sofa.jraft.StateMachine; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.entity.EnumOutter; import com.alipay.sofa.jraft.entity.LogEntry; @@ -29,6 +28,7 @@ import com.alipay.sofa.jraft.error.RaftException; import com.alipay.sofa.jraft.storage.LogManager; import com.alipay.sofa.jraft.util.Requires; +import com.alipay.sofa.jraft.util.ThreadPoolsFactory; import com.alipay.sofa.jraft.util.Utils; /** @@ -134,7 +134,7 @@ protected void runTheRestClosureWithError() { Requires.requireNonNull(this.error, "error"); Requires.requireNonNull(this.error.getStatus(), "error.status"); final Status status = this.error.getStatus(); - Utils.runClosureInThread(done, status); + ThreadPoolsFactory.runClosureInThread(this.fsmCaller.getNode().getGroupId(), done, status); } } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index fd68d5278..37c1e5f86 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -118,6 +118,7 @@ import com.alipay.sofa.jraft.util.SignalHelper; import com.alipay.sofa.jraft.util.SystemPropertyUtil; import com.alipay.sofa.jraft.util.ThreadId; +import com.alipay.sofa.jraft.util.ThreadPoolsFactory; import com.alipay.sofa.jraft.util.Utils; import com.alipay.sofa.jraft.util.concurrent.LongHeldDetectingReadWriteLock; import com.alipay.sofa.jraft.util.timer.RaftTimerFactory; @@ -363,13 +364,15 @@ public ConfigurationCtx(final NodeImpl node) { void start(final Configuration oldConf, final Configuration newConf, final Closure done) { if (isBusy()) { if (done != null) { - Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Already in busy stage.")); + ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), done, new Status(RaftError.EBUSY, + "Already in busy stage.")); } throw new IllegalStateException("Busy stage"); } if (this.done != null) { if (done != null) { - Utils.runClosureInThread(done, new Status(RaftError.EINVAL, "Already have done closure.")); + ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), done, new Status(RaftError.EINVAL, + "Already have done closure.")); } throw new IllegalArgumentException("Already have done closure"); } @@ -403,8 +406,8 @@ private void addNewPeers(final Configuration adding) { } final OnCaughtUp caughtUp = new OnCaughtUp(this.node, this.node.currTerm, newPeer, this.version); final long dueTime = Utils.nowMs() + this.node.options.getElectionTimeoutMs(); - if (!this.node.replicatorGroup.waitCaughtUp(newPeer, this.node.options.getCatchupMargin(), dueTime, - caughtUp)) { + if (!this.node.replicatorGroup.waitCaughtUp(this.node.getGroupId(), newPeer, + this.node.options.getCatchupMargin(), dueTime, caughtUp)) { LOG.error("Node {} waitCaughtUp, peer={}.", this.node.getNodeId(), newPeer); onCaughtUp(this.version, newPeer, false); return; @@ -463,8 +466,8 @@ void reset(final Status st) { this.stage = Stage.STAGE_NONE; this.nchanges = 0; if (this.done != null) { - Utils.runClosureInThread(this.done, st != null ? st : new Status(RaftError.EPERM, - "Leader stepped down.")); + ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), this.done, st != null ? st : new Status( + RaftError.EPERM, "Leader stepped down.")); this.done = null; } } @@ -577,6 +580,7 @@ private boolean initLogStorage() { this.logStorage = this.serviceFactory.createLogStorage(this.options.getLogUri(), this.raftOptions); this.logManager = new LogManagerImpl(); final LogManagerOptions opts = new LogManagerOptions(); + opts.setGroupId(this.groupId); opts.setLogEntryCodecFactory(this.serviceFactory.createLogEntryCodecFactory()); opts.setLogStorage(this.logStorage); opts.setConfigurationManager(this.configManager); @@ -610,7 +614,7 @@ private void handleSnapshotTimeout() { this.writeLock.unlock(); } // do_snapshot in another thread to avoid blocking the timer thread. - Utils.runInThread(() -> doSnapshot(null, false)); + ThreadPoolsFactory.runInThread(this.groupId, () -> doSnapshot(null, false)); } private void handleElectionTimeout() { @@ -754,7 +758,7 @@ private boolean initFSMCaller(final LogId bootstrapId) { LOG.error("Fail to init fsm caller, null instance, bootstrapId={}.", bootstrapId); return false; } - this.closureQueue = new ClosureQueueImpl(); + this.closureQueue = new ClosureQueueImpl(this.groupId); final FSMCallerOptions opts = new FSMCallerOptions(); opts.setAfterShutdown(status -> afterShutdown()); opts.setLogManager(this.logManager); @@ -906,6 +910,10 @@ public boolean init(final NodeOptions opts) { return false; } + if (this.options.getAppendEntriesExecutors() == null) { + this.options.setAppendEntriesExecutors(Utils.getDefaultAppendEntriesExecutor()); + } + this.timerManager = TIMER_FACTORY.getRaftScheduler(this.options.isSharedTimerPool(), this.options.getTimerPoolSize(), "JRaft-Node-ScheduleThreadPool"); @@ -1044,7 +1052,7 @@ protected int adjustTimeout(final int timeoutMs) { // TODO RPC service and ReplicatorGroup is in cycle dependent, refactor it this.replicatorGroup = new ReplicatorGroupImpl(); - this.rpcService = new DefaultRaftClientService(this.replicatorGroup); + this.rpcService = new DefaultRaftClientService(this.replicatorGroup, this.options.getAppendEntriesExecutors()); final ReplicatorGroupOptions rgOpts = new ReplicatorGroupOptions(); rgOpts.setHeartbeatTimeoutMs(heartbeatTimeout(this.options.getElectionTimeoutMs())); rgOpts.setElectionTimeoutMs(this.options.getElectionTimeoutMs()); @@ -1365,7 +1373,7 @@ private void executeApplyingTasks(final List tasks) { LOG.debug("Node {} can't apply, status={}.", getNodeId(), st); final List dones = tasks.stream().map(ele -> ele.done) .filter(Objects::nonNull).collect(Collectors.toList()); - Utils.runInThread(() -> { + ThreadPoolsFactory.runInThread(this.groupId, () -> { for (final Closure done : dones) { done.run(st); } @@ -1381,14 +1389,14 @@ private void executeApplyingTasks(final List tasks) { if (task.done != null) { final Status st = new Status(RaftError.EPERM, "expected_term=%d doesn't match current_term=%d", task.expectedTerm, this.currTerm); - Utils.runClosureInThread(task.done, st); + ThreadPoolsFactory.runClosureInThread(this.groupId, task.done, st); task.reset(); } continue; } if (!this.ballotBox.appendPendingTask(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf(), task.done)) { - Utils.runClosureInThread(task.done, new Status(RaftError.EINTERNAL, "Fail to append task.")); + ThreadPoolsFactory.runClosureInThread(this.groupId, task.done, new Status(RaftError.EINTERNAL, "Fail to append task.")); task.reset(); continue; } @@ -1428,7 +1436,8 @@ public JRaftServiceFactory getServiceFactory() { @Override public void readIndex(final byte[] requestContext, final ReadIndexClosure done) { if (this.shutdownLatch != null) { - Utils.runClosureInThread(done, new Status(RaftError.ENODESHUTDOWN, "Node is shutting down.")); + ThreadPoolsFactory.runClosureInThread(this.groupId, done, new Status(RaftError.ENODESHUTDOWN, + "Node is shutting down.")); throw new IllegalStateException("Node is shutting down"); } Requires.requireNonNull(done, "Null closure"); @@ -1603,7 +1612,7 @@ private void readLeader(final ReadIndexRequest request, final ReadIndexResponse. @Override public void apply(final Task task) { if (this.shutdownLatch != null) { - Utils.runClosureInThread(task.getDone(), new Status(RaftError.ENODESHUTDOWN, "Node is shutting down.")); + ThreadPoolsFactory.runClosureInThread(this.groupId, task.getDone(), new Status(RaftError.ENODESHUTDOWN, "Node is shutting down.")); throw new IllegalStateException("Node is shutting down"); } Requires.requireNonNull(task, "Null task"); @@ -1626,7 +1635,7 @@ public void apply(final Task task) { default: if (!this.applyQueue.tryPublishEvent(translator)) { String errorMsg = "Node is busy, has too many tasks, queue is full and bufferSize="+ this.applyQueue.getBufferSize(); - Utils.runClosureInThread(task.getDone(), + ThreadPoolsFactory.runClosureInThread(this.groupId, task.getDone(), new Status(RaftError.EBUSY, errorMsg)); LOG.warn("Node {} applyQueue is overload.", getNodeId()); this.metrics.recordTimes("apply-task-overload-times", 1); @@ -2176,7 +2185,8 @@ private void onCaughtUp(final PeerId peer, final long term, final long version, LOG.debug("Node {} waits peer {} to catch up.", getNodeId(), peer); final OnCaughtUp caughtUp = new OnCaughtUp(this, term, peer, version); final long dueTime = Utils.nowMs() + this.options.getElectionTimeoutMs(); - if (this.replicatorGroup.waitCaughtUp(peer, this.options.getCatchupMargin(), dueTime, caughtUp)) { + if (this.replicatorGroup.waitCaughtUp(this.groupId, peer, this.options.getCatchupMargin(), dueTime, + caughtUp)) { return; } LOG.warn("Node {} waitCaughtUp failed, peer={}.", getNodeId(), peer); @@ -2344,7 +2354,8 @@ private void unsafeApplyConfiguration(final Configuration newConf, final Configu final ConfigurationChangeDone configurationChangeDone = new ConfigurationChangeDone(this.currTerm, leaderStart); // Use the new_conf to deal the quorum of this very log if (!this.ballotBox.appendPendingTask(newConf, oldConf, configurationChangeDone)) { - Utils.runClosureInThread(configurationChangeDone, new Status(RaftError.EINTERNAL, "Fail to append task.")); + ThreadPoolsFactory.runClosureInThread(this.groupId, configurationChangeDone, new Status( + RaftError.EINTERNAL, "Fail to append task.")); return; } final List entries = new ArrayList<>(); @@ -2369,7 +2380,7 @@ private void unsafeRegisterConfChange(final Configuration oldConf, final Configu } else { status.setError(RaftError.EPERM, "Not leader"); } - Utils.runClosureInThread(done, status); + ThreadPoolsFactory.runClosureInThread(this.groupId, done, status); } return; } @@ -2377,13 +2388,14 @@ private void unsafeRegisterConfChange(final Configuration oldConf, final Configu if (this.confCtx.isBusy()) { LOG.warn("Node {} refused configuration concurrent changing.", getNodeId()); if (done != null) { - Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Doing another configuration change.")); + ThreadPoolsFactory.runClosureInThread(this.groupId, done, new Status(RaftError.EBUSY, + "Doing another configuration change.")); } return; } // Return immediately when the new peers equals to current configuration if (this.conf.getConf().equals(newConf)) { - Utils.runClosureInThread(done, Status.OK()); + ThreadPoolsFactory.runClosureInThread(this.groupId, done, Status.OK()); return; } this.confCtx.start(oldConf, newConf, done); @@ -2405,7 +2417,7 @@ private void afterShutdown() { } if (savedDoneList != null) { for (final Closure closure : savedDoneList) { - Utils.runClosureInThread(closure); + ThreadPoolsFactory.runClosureInThread(this.groupId, closure); } } } @@ -2787,7 +2799,7 @@ public void shutdown(Closure done) { if (this.applyQueue != null) { final CountDownLatch latch = new CountDownLatch(1); this.shutdownLatch = latch; - Utils.runInThread( + ThreadPoolsFactory.runInThread(this.groupId, () -> this.applyQueue.publishEvent((event, sequence) -> event.shutdownLatch = latch)); } else { final int num = GLOBAL_NUM_NODES.decrementAndGet(); @@ -2814,7 +2826,7 @@ public void shutdown(Closure done) { } // Call join() asynchronously final Closure shutdownHook = done; - Utils.runInThread(() -> { + ThreadPoolsFactory.runInThread(this.groupId, () -> { try { join(); } catch (InterruptedException e) { @@ -3148,7 +3160,7 @@ private void doSnapshot(final Closure done, boolean sync) { } else { if (done != null) { final Status status = new Status(RaftError.EINVAL, "Snapshot is not supported"); - Utils.runClosureInThread(done, status); + ThreadPoolsFactory.runClosureInThread(this.groupId, done, status); } } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java index 8d7fe738f..663879989 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import com.alipay.sofa.jraft.util.ThreadPoolsFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -301,7 +302,7 @@ public synchronized void shutdown() { return; } this.shutdownLatch = new CountDownLatch(1); - Utils.runInThread( // + ThreadPoolsFactory.runInThread( this.node.getGroupId(), () -> this.readIndexQueue.publishEvent((event, sequence) -> event.shutdownLatch = this.shutdownLatch)); this.scheduledExecutorService.shutdown(); } @@ -319,7 +320,7 @@ public void join() throws InterruptedException { @Override public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) { if (this.shutdownLatch != null) { - Utils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Was stopped")); + ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), closure, new Status(RaftError.EHOSTDOWN, "Was stopped")); throw new IllegalStateException("Service already shutdown."); } try { @@ -337,7 +338,7 @@ public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) { default: if (!this.readIndexQueue.tryPublishEvent(translator)) { final String errorMsg = "Node is busy, has too many read-index requests, queue is full and bufferSize="+ this.readIndexQueue.getBufferSize(); - Utils.runClosureInThread(closure, + ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), closure, new Status(RaftError.EBUSY, errorMsg)); this.nodeMetrics.recordTimes("read-index-overload-times", 1); LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId()); @@ -348,7 +349,8 @@ public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) { break; } } catch (final Exception e) { - Utils.runClosureInThread(closure, new Status(RaftError.EPERM, "Node is down.")); + ThreadPoolsFactory.runClosureInThread(this.node.getGroupId(), closure + , new Status(RaftError.EPERM, "Node is down.")); } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java index fdd98c9e8..b40c323a3 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java @@ -62,6 +62,7 @@ import com.alipay.sofa.jraft.util.RecycleUtil; import com.alipay.sofa.jraft.util.Requires; import com.alipay.sofa.jraft.util.ThreadId; +import com.alipay.sofa.jraft.util.ThreadPoolsFactory; import com.alipay.sofa.jraft.util.Utils; import com.alipay.sofa.jraft.util.internal.ThrowUtil; import com.codahale.metrics.Gauge; @@ -921,7 +922,7 @@ private String getReplicatorMetricName(final ReplicatorOptions opts) { return "replicator-" + opts.getNode().getGroupId() + "/" + opts.getPeerId(); } - public static void waitForCaughtUp(final ThreadId id, final long maxMargin, final long dueTime, + public static void waitForCaughtUp(final String groupId, final ThreadId id, final long maxMargin, final long dueTime, final CatchUpClosure done) { final Replicator r = (Replicator) id.lock(); @@ -932,7 +933,7 @@ public static void waitForCaughtUp(final ThreadId id, final long maxMargin, fina try { if (r.catchUpClosure != null) { LOG.error("Previous wait_for_caught_up is not over"); - Utils.runClosureInThread(done, new Status(RaftError.EINVAL, "Duplicated call")); + ThreadPoolsFactory.runClosureInThread(groupId, done, new Status(RaftError.EINVAL, "Duplicated call")); return; } done.setMaxMargin(maxMargin); diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReplicatorGroupImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReplicatorGroupImpl.java index 41530c5d7..349444c8c 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReplicatorGroupImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReplicatorGroupImpl.java @@ -142,13 +142,14 @@ public void clearFailureReplicators() { } @Override - public boolean waitCaughtUp(final PeerId peer, final long maxMargin, final long dueTime, final CatchUpClosure done) { + public boolean waitCaughtUp(final String groupId, final PeerId peer, final long maxMargin, final long dueTime, + final CatchUpClosure done) { final ThreadId rid = this.replicatorMap.get(peer); if (rid == null) { return false; } - Replicator.waitForCaughtUp(rid, maxMargin, dueTime, done); + Replicator.waitForCaughtUp(groupId, rid, maxMargin, dueTime, done); return true; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/BootstrapOptions.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/BootstrapOptions.java index c726da17a..1f3af23a2 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/BootstrapOptions.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/BootstrapOptions.java @@ -32,6 +32,8 @@ public class BootstrapOptions { public static final JRaftServiceFactory defaultServiceFactory = JRaftServiceLoader.load(JRaftServiceFactory.class) // .first(); + // groupId cannot be empty + private String groupId; // Containing the initial member of this raft group // Default: empty conf @@ -63,6 +65,14 @@ public class BootstrapOptions { */ private JRaftServiceFactory serviceFactory = defaultServiceFactory; + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + public JRaftServiceFactory getServiceFactory() { return serviceFactory; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/LogManagerOptions.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/LogManagerOptions.java index 92190767b..37a14813e 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/LogManagerOptions.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/LogManagerOptions.java @@ -31,7 +31,7 @@ * 2018-Mar-13 5:15:15 PM */ public class LogManagerOptions { - + private String groupId; private LogStorage logStorage; private ConfigurationManager configurationManager; private FSMCaller fsmCaller; @@ -40,6 +40,14 @@ public class LogManagerOptions { private NodeMetrics nodeMetrics; private LogEntryCodecFactory logEntryCodecFactory = LogEntryV2CodecFactory.getInstance(); + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + public LogEntryCodecFactory getLogEntryCodecFactory() { return this.logEntryCodecFactory; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/LogStorageOptions.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/LogStorageOptions.java index b3620b2e7..d65a4b45b 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/LogStorageOptions.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/LogStorageOptions.java @@ -26,10 +26,19 @@ */ public class LogStorageOptions { + private String groupId; private ConfigurationManager configurationManager; private LogEntryCodecFactory logEntryCodecFactory; + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + public ConfigurationManager getConfigurationManager() { return this.configurationManager; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/RpcOptions.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/RpcOptions.java index 0a9d2cfc6..3c75ad7c9 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/RpcOptions.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/RpcOptions.java @@ -16,6 +16,7 @@ */ package com.alipay.sofa.jraft.option; +import com.alipay.sofa.jraft.util.concurrent.FixedThreadsExecutorGroup; import com.codahale.metrics.MetricRegistry; public class RpcOptions { @@ -24,36 +25,42 @@ public class RpcOptions { * Rpc connect timeout in milliseconds * Default: 1000(1s) */ - private int rpcConnectTimeoutMs = 1000; + private int rpcConnectTimeoutMs = 1000; /** * RPC request default timeout in milliseconds * Default: 5000(5s) */ - private int rpcDefaultTimeout = 5000; + private int rpcDefaultTimeout = 5000; /** * Install snapshot RPC request default timeout in milliseconds * Default: 5 * 60 * 1000(5min) */ - private int rpcInstallSnapshotTimeout = 5 * 60 * 1000; + private int rpcInstallSnapshotTimeout = 5 * 60 * 1000; /** * RPC process thread pool size * Default: 80 */ - private int rpcProcessorThreadPoolSize = 80; + private int rpcProcessorThreadPoolSize = 80; /** * Whether to enable checksum for RPC. * Default: false */ - private boolean enableRpcChecksum = false; + private boolean enableRpcChecksum = false; /** * Metric registry for RPC services, user should not use this field. */ - private MetricRegistry metricRegistry; + private MetricRegistry metricRegistry; + + /** + * The thread pool for custom sending AppendEntries. + * How to create: {@link com.alipay.sofa.jraft.util.concurrent.DefaultFixedThreadsExecutorGroupFactory} + */ + private FixedThreadsExecutorGroup appendEntriesExecutors; public int getRpcConnectTimeoutMs() { return this.rpcConnectTimeoutMs; @@ -103,6 +110,14 @@ public void setMetricRegistry(MetricRegistry metricRegistry) { this.metricRegistry = metricRegistry; } + public FixedThreadsExecutorGroup getAppendEntriesExecutors() { + return appendEntriesExecutors; + } + + public void setAppendEntriesExecutors(FixedThreadsExecutorGroup appendEntriesExecutors) { + this.appendEntriesExecutors = appendEntriesExecutors; + } + @Override public String toString() { return "RpcOptions{" + "rpcConnectTimeoutMs=" + rpcConnectTimeoutMs + ", rpcDefaultTimeout=" diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/SnapshotCopierOptions.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/SnapshotCopierOptions.java index 220ca9f8c..c4a85e284 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/SnapshotCopierOptions.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/SnapshotCopierOptions.java @@ -28,6 +28,7 @@ */ public class SnapshotCopierOptions { + private String groupId; private RaftClientService raftClientService; private Scheduler timerManager; private RaftOptions raftOptions; @@ -37,15 +38,24 @@ public SnapshotCopierOptions() { super(); } - public SnapshotCopierOptions(RaftClientService raftClientService, Scheduler timerManager, RaftOptions raftOptions, - NodeOptions nodeOptions) { + public SnapshotCopierOptions(String groupId, RaftClientService raftClientService, Scheduler timerManager, + RaftOptions raftOptions, NodeOptions nodeOptions) { super(); + this.groupId = groupId; this.raftClientService = raftClientService; this.timerManager = timerManager; this.raftOptions = raftOptions; this.nodeOptions = nodeOptions; } + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + public NodeOptions getNodeOptions() { return this.nodeOptions; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java index 302c8e3fb..fec82f7e1 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractClientService.java @@ -120,7 +120,6 @@ protected boolean initRpcClient(final int rpcProcessorThreadPoolSize) { if (this.rpcOptions.getMetricRegistry() != null) { this.rpcOptions.getMetricRegistry().register("raft-rpc-client-thread-pool", new ThreadPoolMetricSet(this.rpcExecutor)); - Utils.registerClosureExecutorMetrics(this.rpcOptions.getMetricRegistry()); } return true; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/DefaultRaftClientService.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/DefaultRaftClientService.java index b1b399576..90d43b8d0 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/DefaultRaftClientService.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/DefaultRaftClientService.java @@ -60,12 +60,7 @@ */ public class DefaultRaftClientService extends AbstractClientService implements RaftClientService { - private static final FixedThreadsExecutorGroup APPEND_ENTRIES_EXECUTORS = DefaultFixedThreadsExecutorGroupFactory.INSTANCE - .newExecutorGroup( - Utils.APPEND_ENTRIES_THREADS_SEND, - "Append-Entries-Thread-Send", - Utils.MAX_APPEND_ENTRIES_TASKS_PER_THREAD, - true); + private final FixedThreadsExecutorGroup appendEntriesExecutors; private final ConcurrentMap appendEntriesExecutorMap = new ConcurrentHashMap<>(); @@ -79,7 +74,15 @@ protected void configRpcClient(final RpcClient rpcClient) { } public DefaultRaftClientService(final ReplicatorGroup rgGroup) { + this(rgGroup, DefaultFixedThreadsExecutorGroupFactory.INSTANCE.newExecutorGroup( + Utils.APPEND_ENTRIES_THREADS_SEND, "Append-Entries-Thread-Send", Utils.MAX_APPEND_ENTRIES_TASKS_PER_THREAD, + true)); + } + + public DefaultRaftClientService(final ReplicatorGroup rgGroup, + final FixedThreadsExecutorGroup customAppendEntriesExecutors) { this.rgGroup = rgGroup; + this.appendEntriesExecutors = customAppendEntriesExecutors; } @Override @@ -114,7 +117,7 @@ public Future requestVote(final Endpoint endpoint, final RequestVoteReq @Override public Future appendEntries(final Endpoint endpoint, final AppendEntriesRequest request, final int timeoutMs, final RpcResponseClosure done) { - final Executor executor = this.appendEntriesExecutorMap.computeIfAbsent(endpoint, k -> APPEND_ENTRIES_EXECUTORS.next()); + final Executor executor = this.appendEntriesExecutorMap.computeIfAbsent(endpoint, k -> appendEntriesExecutors.next()); if (!checkConnection(endpoint, true)) { return onConnectionFail(endpoint, request, done, executor); diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/LogManagerImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/LogManagerImpl.java index f5b101b20..4f08e89f8 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/LogManagerImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/LogManagerImpl.java @@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.alipay.sofa.jraft.util.ThreadPoolsFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,7 +77,7 @@ public class LogManagerImpl implements LogManager { private static final Logger LOG = LoggerFactory .getLogger(LogManagerImpl.class); - + private String groupId; private LogStorage logStorage; private ConfigurationManager configManager; private FSMCaller fsmCaller; @@ -169,12 +170,14 @@ public boolean init(final LogManagerOptions opts) { LOG.error("Fail to init log manager, log storage is null"); return false; } + this.groupId = opts.getGroupId(); this.raftOptions = opts.getRaftOptions(); this.nodeMetrics = opts.getNodeMetrics(); this.logStorage = opts.getLogStorage(); this.configManager = opts.getConfigurationManager(); LogStorageOptions lsOpts = new LogStorageOptions(); + lsOpts.setGroupId(opts.getGroupId()); lsOpts.setConfigurationManager(this.configManager); lsOpts.setLogEntryCodecFactory(opts.getLogEntryCodecFactory()); @@ -221,7 +224,7 @@ public boolean hasAvailableCapacityToAppendEntries(final int requiredCapacity) { private void stopDiskThread() { this.shutDownLatch = new CountDownLatch(1); - Utils.runInThread(() -> this.diskQueue.publishEvent((event, sequence) -> { + ThreadPoolsFactory.runInThread(this.groupId, () -> this.diskQueue.publishEvent((event, sequence) -> { event.reset(); event.type = EventType.SHUTDOWN; })); @@ -297,7 +300,7 @@ public void appendEntries(final List entries, final StableClosure done Requires.requireNonNull(done, "done"); if (this.hasError) { entries.clear(); - Utils.runClosureInThread(done, new Status(RaftError.EIO, "Corrupted LogStorage")); + ThreadPoolsFactory.runClosureInThread(this.groupId, done, new Status(RaftError.EIO, "Corrupted LogStorage")); return; } boolean doUnlock = true; @@ -357,7 +360,7 @@ private void offerEvent(final StableClosure done, final EventType type) { assert(done != null); if (this.stopped) { - Utils.runClosureInThread(done, new Status(RaftError.ESTOP, "Log manager is stopped.")); + ThreadPoolsFactory.runClosureInThread(this.groupId, done, new Status(RaftError.ESTOP, "Log manager is stopped.")); return; } this.diskQueue.publishEvent((event, sequence) -> { @@ -394,7 +397,7 @@ private boolean wakeupAllWaiter(final Lock lock) { for (int i = 0; i < waiterCount; i++) { final WaitMeta wm = wms.get(i); wm.errorCode = errCode; - Utils.runInThread(() -> runOnNewLog(wm)); + ThreadPoolsFactory.runInThread(this.groupId, () -> runOnNewLog(wm)); } return true; } @@ -1014,7 +1017,7 @@ private boolean checkAndResolveConflict(final List entries, final Stab // should check and resolve the conflicts between the local logs and // |entries| if (firstLogEntry.getId().getIndex() > this.lastLogIndex + 1) { - Utils.runClosureInThread(done, new Status(RaftError.EINVAL, + ThreadPoolsFactory.runClosureInThread(this.groupId, done, new Status(RaftError.EINVAL, "There's gap between first_index=%d and last_log_index=%d", firstLogEntry.getId().getIndex(), this.lastLogIndex)); return false; @@ -1026,7 +1029,7 @@ private boolean checkAndResolveConflict(final List entries, final Stab "Received entries of which the lastLog={} is not greater than appliedIndex={}, return immediately with nothing changed.", lastLogEntry.getId().getIndex(), appliedIndex); // Replicate old logs before appliedIndex should be considered successfully, response OK. - Utils.runClosureInThread(done); + ThreadPoolsFactory.runClosureInThread(this.groupId, done); return false; } if (firstLogEntry.getId().getIndex() == this.lastLogIndex + 1) { @@ -1099,7 +1102,7 @@ private long notifyOnNewLog(final long expectedLastLogIndex, final WaitMeta wm) try { if (expectedLastLogIndex != this.lastLogIndex || this.stopped) { wm.errorCode = this.stopped ? RaftError.ESTOP.getNumber() : 0; - Utils.runInThread(() -> runOnNewLog(wm)); + ThreadPoolsFactory.runInThread(this.groupId, () -> runOnNewLog(wm)); return 0L; } long waitId = this.nextWaitId++; diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorage.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorage.java index 4d22c4d9d..1c63e5292 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorage.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorage.java @@ -59,6 +59,7 @@ import com.alipay.sofa.jraft.util.OnlyForTest; import com.alipay.sofa.jraft.util.Requires; import com.alipay.sofa.jraft.util.StorageOptionsFactory; +import com.alipay.sofa.jraft.util.ThreadPoolsFactory; import com.alipay.sofa.jraft.util.Utils; /** @@ -136,6 +137,7 @@ protected static class EmptyWriteContext implements WriteContext { static EmptyWriteContext INSTANCE = new EmptyWriteContext(); } + private String groupId; private final String path; private final boolean sync; private final boolean openStatistics; @@ -182,6 +184,7 @@ public static ColumnFamilyOptions createColumnFamilyOptions() { public boolean init(final LogStorageOptions opts) { Requires.requireNonNull(opts.getConfigurationManager(), "Null conf manager"); Requires.requireNonNull(opts.getLogEntryCodecFactory(), "Null log entry codec factory"); + this.groupId = opts.getGroupId(); this.writeLock.lock(); try { if (this.db != null) { @@ -581,7 +584,7 @@ public boolean truncatePrefix(final long firstIndexKept) { private void truncatePrefixInBackground(final long startIndex, final long firstIndexKept) { // delete logs in background. - Utils.runInThread(() -> { + ThreadPoolsFactory.runInThread(this.groupId, () -> { long startMs = Utils.monotonicMs(); this.readLock.lock(); try { diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/SnapshotExecutorImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/SnapshotExecutorImpl.java index 18b4a7d13..b429dc78a 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/SnapshotExecutorImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/SnapshotExecutorImpl.java @@ -22,6 +22,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import com.alipay.sofa.jraft.util.ThreadPoolsFactory; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -138,7 +139,7 @@ public SaveSnapshotDone(final SnapshotWriter writer, final Closure done, final S @Override public void run(final Status status) { - Utils.runInThread(() -> continueRun(status)); + ThreadPoolsFactory.runInThread(getNode().getGroupId(), () -> continueRun(status)); } void continueRun(final Status st) { @@ -147,7 +148,7 @@ void continueRun(final Status st) { st.setError(ret, "node call onSnapshotSaveDone failed"); } if (this.done != null) { - Utils.runClosureInThread(this.done, st); + ThreadPoolsFactory.runClosureInThread(getNode().getGroupId(), this.done, st); } } @@ -314,23 +315,26 @@ private void doSnapshot(final Closure done, boolean sync) { this.lock.lock(); try { if (this.stopped) { - Utils.runClosureInThread(done, new Status(RaftError.EPERM, "Is stopped.")); + ThreadPoolsFactory.runClosureInThread(getNode().getGroupId(), done, new Status(RaftError.EPERM, + "Is stopped.")); return; } if (sync && !this.fsmCaller.isRunningOnFSMThread()) { - Utils.runClosureInThread(done, new Status(RaftError.EACCES, + ThreadPoolsFactory.runClosureInThread(getNode().getGroupId(), done, new Status(RaftError.EACCES, "trigger snapshot synchronously out of StateMachine's callback methods")); throw new IllegalStateException( "You can't trigger snapshot synchronously out of StateMachine's callback methods."); } if (this.downloadingSnapshot.get() != null) { - Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Is loading another snapshot.")); + ThreadPoolsFactory.runClosureInThread(getNode().getGroupId(), done, new Status(RaftError.EBUSY, + "Is loading another snapshot.")); return; } if (this.savingSnapshot) { - Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Is saving another snapshot.")); + ThreadPoolsFactory.runClosureInThread(getNode().getGroupId(), done, new Status(RaftError.EBUSY, + "Is saving another snapshot.")); return; } @@ -341,7 +345,7 @@ private void doSnapshot(final Closure done, boolean sync) { doUnlock = false; this.lock.unlock(); this.logManager.clearBufferedLogs(); - Utils.runClosureInThread(done); + ThreadPoolsFactory.runClosureInThread(getNode().getGroupId(), done); return; } @@ -356,8 +360,9 @@ private void doSnapshot(final Closure done, boolean sync) { } doUnlock = false; this.lock.unlock(); - Utils + ThreadPoolsFactory .runClosureInThread( + getNode().getGroupId(), done, new Status(RaftError.ECANCELED, "The snapshot index distance since last snapshot is less than NodeOptions#snapshotLogIndexMargin, canceled this task.")); @@ -366,7 +371,8 @@ private void doSnapshot(final Closure done, boolean sync) { final SnapshotWriter writer = this.snapshotStorage.create(); if (writer == null) { - Utils.runClosureInThread(done, new Status(RaftError.EIO, "Fail to create writer.")); + ThreadPoolsFactory.runClosureInThread(getNode().getGroupId(), done, new Status(RaftError.EIO, + "Fail to create writer.")); reportError(RaftError.EIO.getNumber(), "Fail to create snapshot writer."); return; } @@ -376,7 +382,8 @@ private void doSnapshot(final Closure done, boolean sync) { this.fsmCaller.onSnapshotSaveSync(saveSnapshotDone); } else { if (!this.fsmCaller.onSnapshotSave(saveSnapshotDone)) { - Utils.runClosureInThread(done, new Status(RaftError.EHOSTDOWN, "The raft node is down.")); + ThreadPoolsFactory.runClosureInThread(getNode().getGroupId(), done, new Status(RaftError.EHOSTDOWN, + "The raft node is down.")); return; } } @@ -691,6 +698,7 @@ private SnapshotCopierOptions newCopierOpts() { copierOpts.setRaftClientService(this.node.getRpcService()); copierOpts.setTimerManager(this.node.getTimerManager()); copierOpts.setRaftOptions(this.node.getRaftOptions()); + copierOpts.setGroupId(this.node.getGroupId()); return copierOpts; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotCopier.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotCopier.java index bd9d81cf6..b903126c6 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotCopier.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotCopier.java @@ -45,6 +45,7 @@ import com.alipay.sofa.jraft.util.ArrayDeque; import com.alipay.sofa.jraft.util.ByteBufferCollector; import com.alipay.sofa.jraft.util.Requires; +import com.alipay.sofa.jraft.util.ThreadPoolsFactory; import com.alipay.sofa.jraft.util.Utils; /** @@ -58,6 +59,7 @@ public class LocalSnapshotCopier extends SnapshotCopier { private static final Logger LOG = LoggerFactory.getLogger(LocalSnapshotCopier.class); + private String groupId; private final Lock lock = new ReentrantLock(); /** The copy job future object*/ private volatile Future future; @@ -357,6 +359,7 @@ private void filter() throws IOException { public boolean init(final String uri, final SnapshotCopierOptions opts) { this.copier = new RemoteFileCopier(); this.cancelled = false; + this.groupId = opts.getGroupId(); this.filterBeforeCopyRemote = opts.getNodeOptions().isFilterBeforeCopyRemote(); this.remoteSnapshot = new LocalSnapshot(opts.getRaftOptions()); return this.copier.init(uri, this.snapshotThrottle, opts); @@ -390,7 +393,7 @@ public void close() throws IOException { @Override public void start() { - this.future = Utils.runInThread(this::startCopy); + this.future = ThreadPoolsFactory.runInThread(this.groupId, this::startCopy); } @Override diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ThreadPoolsFactory.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ThreadPoolsFactory.java new file mode 100644 index 000000000..8e6fedea1 --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ThreadPoolsFactory.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alipay.sofa.jraft.Closure; +import com.alipay.sofa.jraft.Status; + +/** + * ThreadPool based on Raft-Group isolation + * + * @author tynan.liu + * @date 2022/6/24 17:25 + * @since 1.3.12 + **/ +public class ThreadPoolsFactory { + private static final Logger LOG = LoggerFactory + .getLogger(ThreadPoolsFactory.class); + /** + * It is used to handle global closure tasks + */ + private static final ConcurrentMap GROUP_THREAD_POOLS = new ConcurrentHashMap<>(); + + private static class GlobalThreadPoolHolder { + private static final ThreadPoolExecutor INSTANCE = ThreadPoolUtil + .newBuilder() + .poolName("JRAFT_GROUP_DEFAULT_EXECUTOR") + .enableMetric(true) + .coreThreads(Utils.MIN_CLOSURE_EXECUTOR_POOL_SIZE) + .maximumThreads(Utils.MAX_CLOSURE_EXECUTOR_POOL_SIZE) + .keepAliveSeconds(60L) + .workQueue(new SynchronousQueue<>()) + .threadFactory( + new NamedThreadFactory( + "JRaft-Group-Default-Executor-", true)).build(); + } + + /** + * You can specify the ThreadPoolExecutor yourself here + * + * @param groupId Raft-Group + * @param executor To specify ThreadPoolExecutor + */ + public static void registerThreadPool(String groupId, ThreadPoolExecutor executor) { + if (executor == null) { + throw new IllegalArgumentException("executor must not be null"); + } + + if (GROUP_THREAD_POOLS.putIfAbsent(groupId, executor) != null) { + throw new IllegalArgumentException(String.format("The group: %s has already registered the ThreadPool", + groupId)); + } + } + + @OnlyForTest + protected static ThreadPoolExecutor getExecutor(String groupId) { + return GROUP_THREAD_POOLS.getOrDefault(groupId, GlobalThreadPoolHolder.INSTANCE); + } + + /** + * Run a task in thread pool,returns the future object. + */ + public static Future runInThread(String groupId, final Runnable runnable) { + return GROUP_THREAD_POOLS.getOrDefault(groupId, GlobalThreadPoolHolder.INSTANCE).submit(runnable); + } + + /** + * Run closure with status in thread pool. + */ + public static Future runClosureInThread(String groupId, final Closure done, final Status status) { + if (done == null) { + return null; + } + return runInThread(groupId, () -> { + try { + done.run(status); + } catch (final Throwable t) { + LOG.error("Fail to run done closure", t); + } + }); + } + + /** + * Run closure with OK status in thread pool. + */ + public static Future runClosureInThread(String groupId, final Closure done) { + if (done == null) { + return null; + } + return runClosureInThread(groupId, done, Status.OK()); + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Utils.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Utils.java index c6128ea8b..18e521917 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Utils.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Utils.java @@ -37,19 +37,23 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; + import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.error.RaftError; +import com.alipay.sofa.jraft.util.concurrent.DefaultFixedThreadsExecutorGroupFactory; +import com.alipay.sofa.jraft.util.concurrent.FixedThreadsExecutorGroup; import com.codahale.metrics.MetricRegistry; /** * Helper methods for jraft. * * @author boyan (boyan@alibaba-inc.com) - * + *

* 2018-Apr-07 10:12:35 AM */ public final class Utils { @@ -108,6 +112,7 @@ public final class Utils { /** * Global thread pool to run closure. */ + @Deprecated private static ThreadPoolExecutor CLOSURE_EXECUTOR = ThreadPoolUtil .newBuilder() .poolName("JRAFT_CLOSURE_EXECUTOR") @@ -126,6 +131,19 @@ public final class Utils { private static final Pattern GROUP_ID_PATTER = Pattern .compile("^[a-zA-Z][a-zA-Z0-9\\-_]*$"); + private static class AppendEntriesThreadPoolHolder { + private static final FixedThreadsExecutorGroup INSTANCE = DefaultFixedThreadsExecutorGroupFactory.INSTANCE + .newExecutorGroup( + Utils.APPEND_ENTRIES_THREADS_SEND, + "Append-Entries-Thread-Send", + Utils.MAX_APPEND_ENTRIES_TASKS_PER_THREAD, true); ; + + } + + public static FixedThreadsExecutorGroup getDefaultAppendEntriesExecutor() { + return AppendEntriesThreadPoolHolder.INSTANCE; + } + public static void verifyGroupId(final String groupId) { if (StringUtils.isBlank(groupId)) { throw new IllegalArgumentException("Blank groupId"); @@ -139,13 +157,16 @@ public static void verifyGroupId(final String groupId) { /** * Register CLOSURE_EXECUTOR into metric registry. + * Deprecated, {@link com.alipay.sofa.jraft.util.ThreadPoolsFactory} */ + @Deprecated public static void registerClosureExecutorMetrics(final MetricRegistry registry) { registry.register("raft-utils-closure-thread-pool", new ThreadPoolMetricSet(CLOSURE_EXECUTOR)); } /** * Run closure in current thread. + * * @param done * @param status */ @@ -157,7 +178,9 @@ public static void runClosure(final Closure done, final Status status) { /** * Run closure with OK status in thread pool. + * Deprecated, {@link com.alipay.sofa.jraft.util.ThreadPoolsFactory} */ + @Deprecated public static Future runClosureInThread(final Closure done) { if (done == null) { return null; @@ -167,15 +190,19 @@ public static Future runClosureInThread(final Closure done) { /** * Run a task in thread pool,returns the future object. + * Deprecated, {@link com.alipay.sofa.jraft.util.ThreadPoolsFactory} */ + @Deprecated public static Future runInThread(final Runnable runnable) { return CLOSURE_EXECUTOR.submit(runnable); } /** * Run closure with status in thread pool. + * Deprecated, {@link com.alipay.sofa.jraft.util.ThreadPoolsFactory} */ @SuppressWarnings("Convert2Lambda") + @Deprecated public static Future runClosureInThread(final Closure done, final Status status) { if (done == null) { return null; @@ -386,6 +413,7 @@ public static boolean atomicMoveFile(final File source, final File target, final /** * Calls fsync on a file or directory. + * * @param file file or directory * @throws IOException if an I/O error occurs */ @@ -471,7 +499,6 @@ public static boolean isIPv6(final String addr) { * PeerId.parse("a:b::d") = new PeerId("a", "b", 0, "d") * PeerId.parse("a:b:c:d") = new PeerId("a", "b", "c", "d") * - * */ public static String[] parsePeerId(final String s) { if (s.startsWith(IPV6_START_MARK) && StringUtils.containsIgnoreCase(s, IPV6_END_MARK)) { diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/closure/ClosureQueueTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/closure/ClosureQueueTest.java index 2512e6a54..27508db56 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/closure/ClosureQueueTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/closure/ClosureQueueTest.java @@ -20,6 +20,8 @@ import java.util.List; import java.util.concurrent.CountDownLatch; +import com.alipay.sofa.jraft.util.ThreadPoolsFactory; +import com.codahale.metrics.MetricRegistry; import org.junit.Before; import org.junit.Test; @@ -29,11 +31,12 @@ import static org.junit.Assert.assertTrue; public class ClosureQueueTest { - private ClosureQueueImpl queue; + private static final String GROUP_ID = "group001"; + private ClosureQueueImpl queue; @Before public void setup() { - this.queue = new ClosureQueueImpl(); + this.queue = new ClosureQueueImpl(GROUP_ID); } @SuppressWarnings("SameParameterValue") diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/BallotBoxTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/BallotBoxTest.java index a310e5668..ae859d955 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/BallotBoxTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/BallotBoxTest.java @@ -39,15 +39,16 @@ @RunWith(value = MockitoJUnitRunner.class) public class BallotBoxTest { - private BallotBox box; + private static final String GROUP_ID = "group001"; + private BallotBox box; @Mock - private FSMCaller waiter; - private ClosureQueueImpl closureQueue; + private FSMCaller waiter; + private ClosureQueueImpl closureQueue; @Before public void setup() { BallotBoxOptions opts = new BallotBoxOptions(); - this.closureQueue = new ClosureQueueImpl(); + this.closureQueue = new ClosureQueueImpl(GROUP_ID); opts.setClosureQueue(this.closureQueue); opts.setWaiter(this.waiter); box = new BallotBox(); diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/FSMCallerTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/FSMCallerTest.java index b1131cd7c..4e4eed45c 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/FSMCallerTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/FSMCallerTest.java @@ -16,12 +16,6 @@ */ package com.alipay.sofa.jraft.core; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - import java.util.concurrent.CountDownLatch; import org.junit.After; @@ -53,23 +47,31 @@ import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; import com.alipay.sofa.jraft.test.TestUtils; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + @RunWith(value = MockitoJUnitRunner.class) public class FSMCallerTest { - private FSMCallerImpl fsmCaller; + private static final String GROUP_ID = "group001"; + private FSMCallerImpl fsmCaller; @Mock - private NodeImpl node; + private NodeImpl node; @Mock - private StateMachine fsm; + private StateMachine fsm; @Mock - private LogManager logManager; - private ClosureQueueImpl closureQueue; + private LogManager logManager; + private ClosureQueueImpl closureQueue; @Before public void setup() { this.fsmCaller = new FSMCallerImpl(); - this.closureQueue = new ClosureQueueImpl(); + this.closureQueue = new ClosureQueueImpl(GROUP_ID); final FSMCallerOptions opts = new FSMCallerOptions(); Mockito.when(this.node.getNodeMetrics()).thenReturn(new NodeMetrics(false)); + Mockito.when(this.node.getGroupId()).thenReturn(GROUP_ID); opts.setNode(this.node); opts.setFsm(this.fsm); opts.setLogManager(this.logManager); diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorImplTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorImplTest.java index 47dd94a69..fa23bfb90 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorImplTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorImplTest.java @@ -29,7 +29,6 @@ import org.mockito.runners.MockitoJUnitRunner; import com.alipay.sofa.jraft.Closure; -import com.alipay.sofa.jraft.StateMachine; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.entity.EnumOutter; import com.alipay.sofa.jraft.entity.LogEntry; @@ -45,16 +44,21 @@ @RunWith(value = MockitoJUnitRunner.class) public class IteratorImplTest { - private IteratorImpl iter; + private static final String GROUP_ID = "group001"; + private IteratorImpl iter; @Mock - private FSMCallerImpl fsmCaller; + private NodeImpl node; @Mock - private LogManager logManager; - private List closures; - private AtomicLong applyingIndex; + private FSMCallerImpl fsmCaller; + @Mock + private LogManager logManager; + private List closures; + private AtomicLong applyingIndex; @Before public void setup() { + Mockito.when(this.node.getGroupId()).thenReturn(GROUP_ID); + Mockito.when(this.fsmCaller.getNode()).thenReturn(node); this.applyingIndex = new AtomicLong(0); this.closures = new ArrayList<>(); for (int i = 0; i < 11; i++) { diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java index 10cd12696..82f89e03c 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java @@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import com.alipay.sofa.jraft.util.ThreadPoolsFactory; +import com.codahale.metrics.MetricRegistry; import org.apache.commons.io.FileUtils; import org.junit.After; import org.junit.AfterClass; @@ -60,7 +62,6 @@ import com.alipay.sofa.jraft.closure.TaskClosure; import com.alipay.sofa.jraft.conf.Configuration; import com.alipay.sofa.jraft.entity.EnumOutter; -import com.alipay.sofa.jraft.entity.LogEntry; import com.alipay.sofa.jraft.entity.LogId; import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.entity.Task; @@ -100,6 +101,7 @@ public class NodeTest { static final Logger LOG = LoggerFactory.getLogger(NodeTest.class); + public static final String GROUP_ID = "test"; private String dataPath; @@ -3074,6 +3076,7 @@ public void testBootStrapWithSnapshot() throws Exception { opts.setSnapshotUri(this.dataPath + File.separator + "snapshot"); opts.setGroupConf(JRaftUtils.getConfiguration("127.0.0.1:5006")); opts.setFsm(fsm); + opts.setGroupId(GROUP_ID); NodeManager.getInstance().addAddress(addr); assertTrue(JRaftUtils.bootstrap(opts)); @@ -3084,7 +3087,7 @@ public void testBootStrapWithSnapshot() throws Exception { nodeOpts.setSnapshotUri(this.dataPath + File.separator + "snapshot"); nodeOpts.setFsm(fsm); - final NodeImpl node = new NodeImpl("test", new PeerId(addr, 0)); + final NodeImpl node = new NodeImpl(GROUP_ID, new PeerId(addr, 0)); assertTrue(node.init(nodeOpts)); assertEquals(26, fsm.getLogs().size()); @@ -3113,6 +3116,7 @@ public void testBootStrapWithoutSnapshot() throws Exception { opts.setSnapshotUri(this.dataPath + File.separator + "snapshot"); opts.setGroupConf(JRaftUtils.getConfiguration("127.0.0.1:5006")); opts.setFsm(fsm); + opts.setGroupId(GROUP_ID); NodeManager.getInstance().addAddress(addr); assertTrue(JRaftUtils.bootstrap(opts)); @@ -3123,7 +3127,7 @@ public void testBootStrapWithoutSnapshot() throws Exception { nodeOpts.setSnapshotUri(this.dataPath + File.separator + "snapshot"); nodeOpts.setFsm(fsm); - final NodeImpl node = new NodeImpl("test", new PeerId(addr, 0)); + final NodeImpl node = new NodeImpl(GROUP_ID, new PeerId(addr, 0)); assertTrue(node.init(nodeOpts)); while (!node.isLeader()) { Thread.sleep(20); @@ -3296,7 +3300,7 @@ private Future startChangePeersThread(final ChangeArg arg) { expectedErrors.add(RaftError.EPERM); expectedErrors.add(RaftError.ECATCHUP); - return Utils.runInThread(() -> { + return TestUtils.runInThread(() -> { try { while (!arg.stop) { arg.c.waitLeader(); @@ -3464,7 +3468,7 @@ public void testChangePeersChaosApplyTasks() throws Exception { args.add(arg); futures.add(startChangePeersThread(arg)); - Utils.runInThread(() -> { + TestUtils.runInThread(() -> { try { for (int i = 0; i < 5000;) { cluster.waitLeader(); diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReadOnlyServiceTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReadOnlyServiceTest.java index e05693b79..ca1a64c8d 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReadOnlyServiceTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReadOnlyServiceTest.java @@ -55,6 +55,7 @@ @RunWith(MockitoJUnitRunner.class) public class ReadOnlyServiceTest { + public static final String GROUP_ID = "test"; private ReadOnlyServiceImpl readOnlyServiceImpl; @Mock @@ -72,7 +73,7 @@ public void setup() { opts.setRaftOptions(new RaftOptions()); Mockito.when(this.node.getNodeMetrics()).thenReturn(new NodeMetrics(false)); Mockito.when(this.node.getOptions()).thenReturn(new NodeOptions()); - Mockito.when(this.node.getGroupId()).thenReturn("test"); + Mockito.when(this.node.getGroupId()).thenReturn(GROUP_ID); Mockito.when(this.node.getServerId()).thenReturn(new PeerId("localhost:8081", 0)); assertTrue(this.readOnlyServiceImpl.init(opts)); } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorTest.java index 56e3fdfd9..552fef76a 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorTest.java @@ -69,21 +69,22 @@ @RunWith(value = MockitoJUnitRunner.class) public class ReplicatorTest { - private ThreadId id; - private final RaftOptions raftOptions = new RaftOptions(); - private TimerManager timerManager; + private static final String GROUP_ID = "test"; + private ThreadId id; + private final RaftOptions raftOptions = new RaftOptions(); + private TimerManager timerManager; @Mock - private RaftClientService rpcService; + private RaftClientService rpcService; @Mock - private NodeImpl node; + private NodeImpl node; @Mock - private BallotBox ballotBox; + private BallotBox ballotBox; @Mock - private LogManager logManager; + private LogManager logManager; @Mock - private SnapshotStorage snapshotStorage; - private ReplicatorOptions opts; - private final PeerId peerId = new PeerId("localhost", 8081); + private SnapshotStorage snapshotStorage; + private ReplicatorOptions opts; + private final PeerId peerId = new PeerId("localhost", 8081); @Before public void setup() { @@ -92,7 +93,7 @@ public void setup() { this.opts.setRaftRpcService(this.rpcService); this.opts.setPeerId(this.peerId); this.opts.setBallotBox(this.ballotBox); - this.opts.setGroupId("test"); + this.opts.setGroupId(GROUP_ID); this.opts.setTerm(1); this.opts.setServerId(new PeerId("localhost", 8082)); this.opts.setNode(this.node); @@ -344,7 +345,7 @@ public void testOnRpcReturnedWaitMoreEntries() throws Exception { Mockito.when(this.logManager.wait(eq(10L), Mockito.any(), same(this.id))).thenReturn(99L); final CountDownLatch latch = new CountDownLatch(1); - Replicator.waitForCaughtUp(this.id, 1, System.currentTimeMillis() + 5000, new CatchUpClosure() { + Replicator.waitForCaughtUp(GROUP_ID, this.id, 1, System.currentTimeMillis() + 5000, new CatchUpClosure() { @Override public void run(final Status status) { diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/SnapshotExecutorTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/SnapshotExecutorTest.java index e784afb25..d81da5b54 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/SnapshotExecutorTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/SnapshotExecutorTest.java @@ -21,9 +21,6 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; -import com.alipay.sofa.jraft.error.RaftError; -import com.alipay.sofa.jraft.test.MockAsyncContext; - import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -33,6 +30,7 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; import com.alipay.sofa.jraft.FSMCaller; import com.alipay.sofa.jraft.Status; @@ -44,6 +42,7 @@ import com.alipay.sofa.jraft.core.TimerManager; import com.alipay.sofa.jraft.entity.LocalFileMetaOutter; import com.alipay.sofa.jraft.entity.RaftOutter; +import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.option.CopyOptions; import com.alipay.sofa.jraft.option.NodeOptions; import com.alipay.sofa.jraft.option.RaftOptions; @@ -61,11 +60,11 @@ import com.alipay.sofa.jraft.storage.snapshot.local.LocalSnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.local.LocalSnapshotStorage; import com.alipay.sofa.jraft.storage.snapshot.local.LocalSnapshotWriter; +import com.alipay.sofa.jraft.test.MockAsyncContext; +import com.alipay.sofa.jraft.test.TestUtils; import com.alipay.sofa.jraft.util.Endpoint; -import com.alipay.sofa.jraft.util.Utils; import com.google.protobuf.ByteString; import com.google.protobuf.Message; -import org.mockito.stubbing.Answer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -74,6 +73,7 @@ @RunWith(value = MockitoJUnitRunner.class) public class SnapshotExecutorTest extends BaseStorageTest { + private static final String GROUP_ID = "group001"; private SnapshotExecutorImpl executor; @Mock private NodeImpl node; @@ -118,6 +118,7 @@ public void setup() throws Exception { this.uri = "remote://" + this.hostPort + "/" + this.readerId; this.copyOpts = new CopyOptions(); + Mockito.when(this.node.getGroupId()).thenReturn(GROUP_ID); Mockito.when(this.node.getRaftOptions()).thenReturn(new RaftOptions()); Mockito.when(this.node.getOptions()).thenReturn(new NodeOptions()); Mockito.when(this.node.getRpcService()).thenReturn(this.raftClientService); @@ -185,23 +186,15 @@ public Future answer(InvocationOnMock invocation) throws Throwable { final MockAsyncContext installContext = new MockAsyncContext(); final MockAsyncContext retryInstallContext = new MockAsyncContext(); - Utils.runInThread(new Runnable() { - @Override - public void run() { - SnapshotExecutorTest.this.executor.installSnapshot(irb.build(), - RpcRequests.InstallSnapshotResponse.newBuilder(), new RpcRequestClosure(installContext)); - } - }); + TestUtils.runInThread(() -> SnapshotExecutorTest.this.executor.installSnapshot(irb.build(), + RpcRequests.InstallSnapshotResponse.newBuilder(), new RpcRequestClosure(installContext))); Thread.sleep(500); retryLatch.await(); - Utils.runInThread(new Runnable() { - @Override - public void run() { - answerLatch.countDown(); - SnapshotExecutorTest.this.executor.installSnapshot(irb.build(), - RpcRequests.InstallSnapshotResponse.newBuilder(), new RpcRequestClosure(retryInstallContext)); - } + TestUtils.runInThread(() -> { + answerLatch.countDown(); + SnapshotExecutorTest.this.executor.installSnapshot(irb.build(), + RpcRequests.InstallSnapshotResponse.newBuilder(), new RpcRequestClosure(retryInstallContext)); }); RpcResponseClosure closure = argument.getValue(); @@ -265,7 +258,7 @@ public void testInstallSnapshot() throws Exception { Mockito.when( this.raftClientService.getFile(eq(new Endpoint("localhost", 8080)), eq(rb.build()), eq(this.copyOpts.getTimeoutMs()), argument.capture())).thenReturn(future); - Utils.runInThread(new Runnable() { + TestUtils.runInThread(new Runnable() { @Override public void run() { @@ -333,7 +326,7 @@ public void testInterruptInstallaling() throws Exception { Mockito.when( this.raftClientService.getFile(eq(new Endpoint("localhost", 8080)), eq(rb.build()), eq(this.copyOpts.getTimeoutMs()), argument.capture())).thenReturn(future); - Utils.runInThread(new Runnable() { + TestUtils.runInThread(new Runnable() { @Override public void run() { diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/BaseLogStorageTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/BaseLogStorageTest.java index b13cca754..7152d56d6 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/BaseLogStorageTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/BaseLogStorageTest.java @@ -16,12 +16,6 @@ */ package com.alipay.sofa.jraft.storage.impl; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -45,7 +39,14 @@ import com.alipay.sofa.jraft.test.TestUtils; import com.alipay.sofa.jraft.util.Utils; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + public abstract class BaseLogStorageTest extends BaseStorageTest { + private static final String GROUP_ID = "group001"; protected LogStorage logStorage; private ConfigurationManager confManager; private LogEntryCodecFactory logEntryCodecFactory; @@ -69,6 +70,7 @@ protected LogStorageOptions newLogStorageOptions() { final LogStorageOptions opts = new LogStorageOptions(); opts.setConfigurationManager(this.confManager); opts.setLogEntryCodecFactory(this.logEntryCodecFactory); + opts.setGroupId(GROUP_ID); return opts; } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LogManagerTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LogManagerTest.java index ecfe2b71b..e58c8dc88 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LogManagerTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LogManagerTest.java @@ -55,6 +55,7 @@ @RunWith(value = MockitoJUnitRunner.class) public class LogManagerTest extends BaseStorageTest { + private static final String GROUP_ID = "group001"; private LogManagerImpl logManager; private ConfigurationManager confManager; @Mock @@ -77,6 +78,7 @@ public void setup() throws Exception { opts.setNodeMetrics(new NodeMetrics(false)); opts.setLogStorage(this.logStorage); opts.setRaftOptions(raftOptions); + opts.setGroupId(GROUP_ID); assertTrue(this.logManager.init(opts)); } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java index fba2d1667..676f4da8a 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java @@ -45,8 +45,8 @@ import com.alipay.sofa.jraft.storage.BaseStorageTest; import com.alipay.sofa.jraft.storage.snapshot.Snapshot; import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; +import com.alipay.sofa.jraft.test.TestUtils; import com.alipay.sofa.jraft.util.Endpoint; -import com.alipay.sofa.jraft.util.Utils; import com.google.protobuf.ByteString; import com.google.protobuf.Message; @@ -58,6 +58,7 @@ @RunWith(value = MockitoJUnitRunner.class) public class LocalSnapshotCopierTest extends BaseStorageTest { + private static final String GROUP_ID = "group001"; private LocalSnapshotCopier copier; @Mock private RaftClientService raftClientService; @@ -93,8 +94,8 @@ public void setup() throws Exception { this.copier = new LocalSnapshotCopier(); this.copyOpts = new CopyOptions(); Mockito.when(this.raftClientService.connect(new Endpoint("localhost", 8081))).thenReturn(true); - assertTrue(this.copier.init(this.uri, new SnapshotCopierOptions(this.raftClientService, this.timerManager, - this.raftOptions, new NodeOptions()))); + assertTrue(this.copier.init(this.uri, new SnapshotCopierOptions(GROUP_ID, this.raftClientService, + this.timerManager, this.raftOptions, new NodeOptions()))); this.copier.setStorage(this.snapshotStorage); } @@ -148,7 +149,7 @@ public void testInterrupt() throws Exception { this.copier.start(); Thread.sleep(10); - Utils.runInThread(new Runnable() { + TestUtils.runInThread(new Runnable() { @Override public void run() { diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/remote/RemoteFileCopierTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/remote/RemoteFileCopierTest.java index 5b625d9c2..3097c4847 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/remote/RemoteFileCopierTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/remote/RemoteFileCopierTest.java @@ -37,10 +37,11 @@ @RunWith(value = MockitoJUnitRunner.class) public class RemoteFileCopierTest { - private RemoteFileCopier copier; + private static final String GROUP_ID = "group001"; + private RemoteFileCopier copier; @Mock - private RaftClientService rpcService; - private TimerManager timerManager; + private RaftClientService rpcService; + private TimerManager timerManager; @Before public void setup() { @@ -51,8 +52,8 @@ public void setup() { @Test public void testInit() { Mockito.when(rpcService.connect(new Endpoint("localhost", 8081))).thenReturn(true); - assertTrue(copier.init("remote://localhost:8081/999", null, new SnapshotCopierOptions(rpcService, timerManager, - new RaftOptions(), new NodeOptions()))); + assertTrue(copier.init("remote://localhost:8081/999", null, new SnapshotCopierOptions(GROUP_ID, rpcService, + timerManager, new RaftOptions(), new NodeOptions()))); assertEquals(999, copier.getReaderId()); Assert.assertEquals("localhost", copier.getEndpoint().getIp()); Assert.assertEquals(8081, copier.getEndpoint().getPort()); @@ -61,7 +62,7 @@ public void testInit() { @Test public void testInitFail() { Mockito.when(rpcService.connect(new Endpoint("localhost", 8081))).thenReturn(false); - assertFalse(copier.init("remote://localhost:8081/999", null, new SnapshotCopierOptions(rpcService, + assertFalse(copier.init("remote://localhost:8081/999", null, new SnapshotCopierOptions(GROUP_ID, rpcService, timerManager, new RaftOptions(), new NodeOptions()))); } } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/test/TestUtils.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/test/TestUtils.java index 939b0de80..b9102f946 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/test/TestUtils.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/test/TestUtils.java @@ -28,7 +28,11 @@ import java.util.ArrayList; import java.util.Enumeration; import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.ThreadPoolExecutor; + import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.conf.ConfigurationEntry; import com.alipay.sofa.jraft.entity.EnumOutter; @@ -37,16 +41,29 @@ import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.rpc.RpcRequests; import com.alipay.sofa.jraft.util.Endpoint; +import com.alipay.sofa.jraft.util.NamedThreadFactory; +import com.alipay.sofa.jraft.util.ThreadPoolUtil; /** * Test helper * * @author boyan (boyan@alibaba-inc.com) - * - * 2018-Apr-11 10:16:07 AM + *

+ * 2018-Apr-11 10:16:07 AM */ public class TestUtils { + private static ThreadPoolExecutor executor = ThreadPoolUtil + .newBuilder() + .poolName("JRAFT_CLOSURE_EXECUTOR") + .enableMetric(true) + .coreThreads(5) + .maximumThreads(10) + .keepAliveSeconds(60L) + .workQueue(new SynchronousQueue<>()) + .threadFactory( + new NamedThreadFactory("JRaft-Closure-Executor-", true)).build(); + public static ConfigurationEntry getConfEntry(final String confStr, final String oldConfStr) { ConfigurationEntry entry = new ConfigurationEntry(); entry.setConf(JRaftUtils.getConfiguration(confStr)); @@ -158,4 +175,9 @@ public static byte[] getRandomBytes() { ThreadLocalRandom.current().nextBytes(requestContext); return requestContext; } + + public static Future runInThread(final Runnable runnable) { + return executor.submit(runnable); + } + } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/util/CountDownEventTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/util/CountDownEventTest.java index 2ee8a6597..938118970 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/util/CountDownEventTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/util/CountDownEventTest.java @@ -19,6 +19,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; +import com.alipay.sofa.jraft.test.TestUtils; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -31,19 +32,15 @@ public void testAwait() throws Exception { e.incrementAndGet(); AtomicLong cost = new AtomicLong(0); CountDownLatch latch = new CountDownLatch(1); - Utils.runInThread(new Runnable() { - - @Override - public void run() { - try { - long start = System.currentTimeMillis(); - e.await(); - cost.set(System.currentTimeMillis() - start); - } catch (Exception e) { - e.printStackTrace(); - } - latch.countDown(); + TestUtils.runInThread(() -> { + try { + long start = System.currentTimeMillis(); + e.await(); + cost.set(System.currentTimeMillis() - start); + } catch (Exception e1) { + e1.printStackTrace(); } + latch.countDown(); }); Thread.sleep(1000); e.countDown(); @@ -59,18 +56,14 @@ public void testInterrupt() throws Exception { e.incrementAndGet(); e.incrementAndGet(); Thread thread = Thread.currentThread(); - Utils.runInThread(new Runnable() { - - @Override - public void run() { - try { - Thread.sleep(100); - thread.interrupt(); - } catch (Exception e) { - e.printStackTrace(); - } - + TestUtils.runInThread(() -> { + try { + Thread.sleep(100); + thread.interrupt(); + } catch (Exception e1) { + e1.printStackTrace(); } + }); e.await(); } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/util/ThreadPoolsFactoryTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/util/ThreadPoolsFactoryTest.java new file mode 100644 index 000000000..6b4bca454 --- /dev/null +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/util/ThreadPoolsFactoryTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.error.RaftError; + +import junit.framework.TestCase; + +/** + * @author far.liu + */ +@RunWith(value = MockitoJUnitRunner.class) +public class ThreadPoolsFactoryTest extends TestCase { + private static final String GROUP_ID_001 = "group001"; + private static final String GROUP_ID_002 = "group002"; + private static final String GROUP_ID_003 = "group003"; + private ThreadPoolExecutor customExecutor = ThreadPoolUtil + .newBuilder() + .poolName("JRAFT_TEST_CUSTOM_EXECUTOR") + .enableMetric(true) + .coreThreads(Utils.MIN_CLOSURE_EXECUTOR_POOL_SIZE) + .maximumThreads(Utils.MAX_CLOSURE_EXECUTOR_POOL_SIZE) + .keepAliveSeconds(60L) + .workQueue(new SynchronousQueue<>()) + .threadFactory( + new NamedThreadFactory("JRaft-Test-Custom-Executor-", true)) + .build(); + + @Test + public void testGlobalExecutor() { + ThreadPoolExecutor executor1 = ThreadPoolsFactory.getExecutor(GROUP_ID_001); + ThreadPoolExecutor executor2 = ThreadPoolsFactory.getExecutor(GROUP_ID_002); + Assert.assertEquals(executor1, executor2); + } + + @Test + public void testCustomExecutor() { + ThreadPoolsFactory.registerThreadPool(GROUP_ID_003, customExecutor); + ThreadPoolExecutor executor = ThreadPoolsFactory.getExecutor(GROUP_ID_003); + Assert.assertEquals(executor, customExecutor); + } + + @Test + public void testInvalidGroup() { + ThreadPoolExecutor executor1 = ThreadPoolsFactory.getExecutor(GROUP_ID_001); + ThreadPoolExecutor executor = ThreadPoolsFactory.getExecutor("test"); + Assert.assertEquals(executor1, executor); + } + + @Test + public void testRunThread() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + ThreadPoolsFactory.runInThread(GROUP_ID_001, () -> latch.countDown()); + latch.await(); + } + + @Test + public void testRunClosureWithStatus() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + ThreadPoolsFactory.runClosureInThread(GROUP_ID_001, status -> { + assertFalse(status.isOk()); + Assert.assertEquals(RaftError.EACCES.getNumber(), status.getCode()); + assertEquals("test 99", status.getErrorMsg()); + latch.countDown(); + }, new Status(RaftError.EACCES, "test %d", 99)); + latch.await(); + } + + @Test + public void testRunClosure() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + ThreadPoolsFactory.runClosureInThread(GROUP_ID_001, status -> { + assertTrue(status.isOk()); + latch.countDown(); + }); + latch.await(); + } +} \ No newline at end of file diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/util/UtilsTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/util/UtilsTest.java index 9b77eaf3f..e315d5ec6 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/util/UtilsTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/util/UtilsTest.java @@ -39,19 +39,6 @@ */ public class UtilsTest { - @Test - public void testRunThread() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - Utils.runInThread(new Runnable() { - - @Override - public void run() { - latch.countDown(); - } - }); - latch.await(); - } - @Test(expected = IllegalArgumentException.class) public void tetsVerifyGroupId1() { Utils.verifyGroupId(""); @@ -83,36 +70,6 @@ public void tetsVerifyGroupId5() { Utils.verifyGroupId("t_hello"); } - @Test - public void testRunClosure() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - Utils.runClosureInThread(new Closure() { - - @Override - public void run(Status status) { - assertTrue(status.isOk()); - latch.countDown(); - } - }); - latch.await(); - } - - @Test - public void testRunClosureWithStatus() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - Utils.runClosureInThread(new Closure() { - - @Override - public void run(Status status) { - assertFalse(status.isOk()); - Assert.assertEquals(RaftError.EACCES.getNumber(), status.getCode()); - assertEquals("test 99", status.getErrorMsg()); - latch.countDown(); - } - }, new Status(RaftError.EACCES, "test %d", 99)); - latch.await(); - } - @Test public void test_getProcessId() { long pid = Utils.getProcessId(-1); diff --git a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/counter/CounterStateMachine.java b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/counter/CounterStateMachine.java index a578efd2b..bb3589a7b 100644 --- a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/counter/CounterStateMachine.java +++ b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/counter/CounterStateMachine.java @@ -21,7 +21,12 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; + +import com.alipay.sofa.jraft.util.NamedThreadFactory; +import com.alipay.sofa.jraft.util.ThreadPoolUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alipay.remoting.exception.CodecException; @@ -46,16 +51,25 @@ */ public class CounterStateMachine extends StateMachineAdapter { - private static final Logger LOG = LoggerFactory.getLogger(CounterStateMachine.class); - + private static final Logger LOG = LoggerFactory.getLogger(CounterStateMachine.class); + private static ThreadPoolExecutor executor = ThreadPoolUtil + .newBuilder() + .poolName("JRAFT_TEST_EXECUTOR") + .enableMetric(true) + .coreThreads(3) + .maximumThreads(5) + .keepAliveSeconds(60L) + .workQueue(new SynchronousQueue<>()) + .threadFactory( + new NamedThreadFactory("JRaft-Test-Executor-", true)).build(); /** * Counter value */ - private final AtomicLong value = new AtomicLong(0); + private final AtomicLong value = new AtomicLong(0); /** * Leader term */ - private final AtomicLong leaderTerm = new AtomicLong(-1); + private final AtomicLong leaderTerm = new AtomicLong(-1); public boolean isLeader() { return this.leaderTerm.get() > 0; @@ -120,7 +134,7 @@ public void onApply(final Iterator iter) { @Override public void onSnapshotSave(final SnapshotWriter writer, final Closure done) { final long currVal = this.value.get(); - Utils.runInThread(() -> { + executor.submit(() -> { final CounterSnapshotFile snapshot = new CounterSnapshotFile(writer.getPath() + File.separator + "data"); if (snapshot.save(currVal)) { if (writer.addFile("data")) { diff --git a/jraft-extension/bdb-log-storage-impl/src/main/java/com/alipay/sofa/jraft/storage/impl/BDBLogStorage.java b/jraft-extension/bdb-log-storage-impl/src/main/java/com/alipay/sofa/jraft/storage/impl/BDBLogStorage.java index a3fdab44b..95ef83757 100644 --- a/jraft-extension/bdb-log-storage-impl/src/main/java/com/alipay/sofa/jraft/storage/impl/BDBLogStorage.java +++ b/jraft-extension/bdb-log-storage-impl/src/main/java/com/alipay/sofa/jraft/storage/impl/BDBLogStorage.java @@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.alipay.sofa.jraft.util.ThreadPoolsFactory; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; @@ -61,7 +62,7 @@ /** * Log storage based on bdb. - * + * * @author cff * */ @@ -71,6 +72,7 @@ public class BDBLogStorage implements LogStorage, Describer { static final String DEFAULT_DATABASE_NAME = "jraft-log"; static final String CONF_DATABASE_NAME = "jraft-conf"; + private String groupId; private Database defaultTable; private Database confTable; private Environment environment; @@ -106,6 +108,7 @@ public boolean init(LogStorageOptions opts) { Requires.requireNonNull(opts, "Null LogStorageOptions opts"); Requires.requireNonNull(opts.getConfigurationManager(), "Null conf manager"); Requires.requireNonNull(opts.getLogEntryCodecFactory(), "Null log entry codec factory"); + this.groupId = opts.getGroupId(); this.logEntryDecoder = opts.getLogEntryCodecFactory().decoder(); this.logEntryEncoder = opts.getLogEntryCodecFactory().encoder(); this.writeLock.lock(); @@ -506,7 +509,7 @@ private void truncatePrefixInBackground(final long startIndex, final long firstI return; } // delete logs in background. - Utils.runInThread(() -> { + ThreadPoolsFactory.runInThread(this.groupId, () -> { this.readLock.lock(); try { checkState(); diff --git a/jraft-extension/java-log-storage-impl/src/main/java/com/alipay/sofa/jraft/storage/LogitLogStorage.java b/jraft-extension/java-log-storage-impl/src/main/java/com/alipay/sofa/jraft/storage/LogitLogStorage.java index 312f1d17a..6000d4de6 100644 --- a/jraft-extension/java-log-storage-impl/src/main/java/com/alipay/sofa/jraft/storage/LogitLogStorage.java +++ b/jraft-extension/java-log-storage-impl/src/main/java/com/alipay/sofa/jraft/storage/LogitLogStorage.java @@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.alipay.sofa.jraft.util.ThreadPoolsFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +52,6 @@ import com.alipay.sofa.jraft.util.OnlyForTest; import com.alipay.sofa.jraft.util.Pair; import com.alipay.sofa.jraft.util.Requires; -import com.alipay.sofa.jraft.util.Utils; /** * A logStorage implemented by java @@ -72,6 +72,7 @@ public class LogitLogStorage implements LogStorage { private final String indexStorePath; private final String segmentStorePath; private final String confStorePath; + private String groupId; private ConfigurationManager configurationManager; private LogEntryEncoder logEntryEncoder; private LogEntryDecoder logEntryDecoder; @@ -93,6 +94,7 @@ public LogitLogStorage(final String path, final StoreOptions storeOptions) { public boolean init(final LogStorageOptions opts) { Requires.requireNonNull(opts.getConfigurationManager(), "Null conf manager"); Requires.requireNonNull(opts.getLogEntryCodecFactory(), "Null log entry codec factory"); + this.groupId = opts.getGroupId(); this.writeLock.lock(); try { this.logEntryDecoder = opts.getLogEntryCodecFactory().decoder(); @@ -463,7 +465,7 @@ public boolean truncatePrefix(final long firstIndexKept) { try { final boolean ret = saveFirstLogIndex(firstIndexKept); if (ret) { - Utils.runInThread(() -> { + ThreadPoolsFactory.runInThread(this.groupId, () -> { this.indexDB.truncatePrefix(firstIndexKept); this.segmentLogDB.truncatePrefix(firstIndexKept); this.confDB.truncatePrefix(firstIndexKept); diff --git a/jraft-extension/java-log-storage-impl/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java b/jraft-extension/java-log-storage-impl/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java index 6241e0a92..54eb5c0ea 100644 --- a/jraft-extension/java-log-storage-impl/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java +++ b/jraft-extension/java-log-storage-impl/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java @@ -95,6 +95,7 @@ public class NodeTest { static final Logger LOG = LoggerFactory.getLogger(NodeTest.class); + public static final String GROUP_ID = "test"; private String dataPath; @@ -1550,15 +1551,20 @@ private boolean assertReadIndex(final Node node, final int index) throws Interru @Override public void run(final Status status, final long theIndex, final byte[] reqCtx) { - if (status.isOk()) { - assertEquals(index, theIndex); - assertArrayEquals(requestContext, reqCtx); - success.set(true); - } else { - assertTrue(status.getErrorMsg(), status.getErrorMsg().contains("RPC exception:Check connection[")); - assertTrue(status.getErrorMsg(), status.getErrorMsg().contains("] fail and try to create new one")); + try { + if (status.isOk()) { + assertEquals(index, theIndex); + assertArrayEquals(requestContext, reqCtx); + success.set(true); + } else { + assertTrue(status.getErrorMsg(), + status.getErrorMsg().contains("RPC exception:Check connection[")); + assertTrue(status.getErrorMsg(), + status.getErrorMsg().contains("] fail and try to create new one")); + } + } finally { + latch.countDown(); } - latch.countDown(); } }); latch.await(); @@ -2980,6 +2986,7 @@ public void testBootStrapWithSnapshot() throws Exception { opts.setSnapshotUri(this.dataPath + File.separator + "snapshot"); opts.setGroupConf(JRaftUtils.getConfiguration("127.0.0.1:5006")); opts.setFsm(fsm); + opts.setGroupId(GROUP_ID); NodeManager.getInstance().addAddress(addr); assertTrue(JRaftUtils.bootstrap(opts)); @@ -2990,7 +2997,7 @@ public void testBootStrapWithSnapshot() throws Exception { nodeOpts.setSnapshotUri(this.dataPath + File.separator + "snapshot"); nodeOpts.setFsm(fsm); - final NodeImpl node = new NodeImpl("test", new PeerId(addr, 0)); + final NodeImpl node = new NodeImpl(GROUP_ID, new PeerId(addr, 0)); assertTrue(node.init(nodeOpts)); assertEquals(26, fsm.getLogs().size()); @@ -3019,6 +3026,7 @@ public void testBootStrapWithoutSnapshot() throws Exception { opts.setSnapshotUri(this.dataPath + File.separator + "snapshot"); opts.setGroupConf(JRaftUtils.getConfiguration("127.0.0.1:5006")); opts.setFsm(fsm); + opts.setGroupId(GROUP_ID); NodeManager.getInstance().addAddress(addr); assertTrue(JRaftUtils.bootstrap(opts)); @@ -3029,7 +3037,7 @@ public void testBootStrapWithoutSnapshot() throws Exception { nodeOpts.setSnapshotUri(this.dataPath + File.separator + "snapshot"); nodeOpts.setFsm(fsm); - final NodeImpl node = new NodeImpl("test", new PeerId(addr, 0)); + final NodeImpl node = new NodeImpl(GROUP_ID, new PeerId(addr, 0)); assertTrue(node.init(nodeOpts)); while (!node.isLeader()) { Thread.sleep(20); @@ -3201,7 +3209,7 @@ private Future startChangePeersThread(final ChangeArg arg) { expectedErrors.add(RaftError.EPERM); expectedErrors.add(RaftError.ECATCHUP); - return Utils.runInThread(() -> { + return TestUtils.runInThread(() -> { try { while (!arg.stop) { arg.c.waitLeader(); @@ -3362,9 +3370,9 @@ public void testChangePeersChaosApplyTasks() throws Exception { args.add(arg); futures.add(startChangePeersThread(arg)); - Utils.runInThread(() -> { + TestUtils.runInThread(() -> { try { - for (int i = 0; i < 5000;) { + for (int i = 0; i < 5000; ) { cluster.waitLeader(); final Node leader = cluster.getLeader(); if (leader == null) { diff --git a/jraft-extension/java-log-storage-impl/src/test/java/com/alipay/sofa/jraft/storage/BaseStorageTest.java b/jraft-extension/java-log-storage-impl/src/test/java/com/alipay/sofa/jraft/storage/BaseStorageTest.java index dc29a0132..735b77e73 100644 --- a/jraft-extension/java-log-storage-impl/src/test/java/com/alipay/sofa/jraft/storage/BaseStorageTest.java +++ b/jraft-extension/java-log-storage-impl/src/test/java/com/alipay/sofa/jraft/storage/BaseStorageTest.java @@ -37,6 +37,7 @@ import static com.alipay.sofa.jraft.test.TestUtils.mockEntry; public class BaseStorageTest { + protected static final String GROUP_ID = "group001"; protected String path; protected StoreOptions storeOptions = new StoreOptions(); protected int indexEntrySize; @@ -93,6 +94,7 @@ protected LogStorageOptions newLogStorageOptions() { final LogStorageOptions opts = new LogStorageOptions(); opts.setConfigurationManager(this.confManager); opts.setLogEntryCodecFactory(this.logEntryCodecFactory); + opts.setGroupId(GROUP_ID); return opts; } } diff --git a/jraft-extension/java-log-storage-impl/src/test/java/com/alipay/sofa/jraft/test/TestUtils.java b/jraft-extension/java-log-storage-impl/src/test/java/com/alipay/sofa/jraft/test/TestUtils.java index deadf29d3..29dccfbf4 100644 --- a/jraft-extension/java-log-storage-impl/src/test/java/com/alipay/sofa/jraft/test/TestUtils.java +++ b/jraft-extension/java-log-storage-impl/src/test/java/com/alipay/sofa/jraft/test/TestUtils.java @@ -28,7 +28,10 @@ import java.util.ArrayList; import java.util.Enumeration; import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.ThreadPoolExecutor; import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.conf.ConfigurationEntry; @@ -38,16 +41,29 @@ import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.rpc.RpcRequests; import com.alipay.sofa.jraft.util.Endpoint; +import com.alipay.sofa.jraft.util.NamedThreadFactory; +import com.alipay.sofa.jraft.util.ThreadPoolUtil; /** * Test helper * * @author boyan (boyan@alibaba-inc.com) * - * 2018-Apr-11 10:16:07 AM + * 2018-Apr-11 10:16:07 AM */ public class TestUtils { + private static ThreadPoolExecutor executor = ThreadPoolUtil + .newBuilder() + .poolName("JRAFT_CLOSURE_EXECUTOR") + .enableMetric(true) + .coreThreads(5) + .maximumThreads(10) + .keepAliveSeconds(60L) + .workQueue(new SynchronousQueue<>()) + .threadFactory( + new NamedThreadFactory("JRaft-Closure-Executor-", true)).build(); + public static ConfigurationEntry getConfEntry(final String confStr, final String oldConfStr) { ConfigurationEntry entry = new ConfigurationEntry(); entry.setConf(JRaftUtils.getConfiguration(confStr)); @@ -159,4 +175,8 @@ public static byte[] getRandomBytes() { ThreadLocalRandom.current().nextBytes(requestContext); return requestContext; } + + public static Future runInThread(final Runnable runnable) { + return executor.submit(runnable); + } } diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/RheaKVTestCluster.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/RheaKVTestCluster.java index ba21eb049..c576d8c09 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/RheaKVTestCluster.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/RheaKVTestCluster.java @@ -22,7 +22,14 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadLocalRandom; - +import java.util.concurrent.ThreadPoolExecutor; + +import com.alipay.sofa.jraft.util.concurrent.FixedThreadsExecutorGroup; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.module.SimpleModule; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,6 +100,22 @@ protected void shutdown(final boolean deleteFiles) throws Exception { private RheaKVStoreOptions readOpts(final String conf) throws IOException { final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + SimpleModule module = new SimpleModule(); + module.addDeserializer(ThreadPoolExecutor.class, new JsonDeserializer() { + @Override + public ThreadPoolExecutor deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, + JsonProcessingException { + return null; + } + }); + module.addDeserializer(FixedThreadsExecutorGroup.class, new JsonDeserializer() { + @Override + public FixedThreadsExecutorGroup deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, + JsonProcessingException { + return null; + } + }); + mapper.findAndRegisterModules().registerModule(module); try (final InputStream in = RheaKVTestCluster.class.getResourceAsStream(conf)) { return mapper.readValue(in, RheaKVStoreOptions.class); } diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/YamlTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/YamlTest.java index ff9e2efb5..8c9d39de9 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/YamlTest.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/YamlTest.java @@ -18,7 +18,14 @@ import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.ThreadPoolExecutor; +import com.alipay.sofa.jraft.util.concurrent.FixedThreadsExecutorGroup; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.module.SimpleModule; import org.junit.Test; import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions; @@ -34,6 +41,22 @@ public class YamlTest { public void parseStoreEngineOptionsTest() throws IOException { final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); final InputStream in = YamlTest.class.getResourceAsStream("/conf/rhea_test_cluster_1.yaml"); + SimpleModule module = new SimpleModule(); + module.addDeserializer(ThreadPoolExecutor.class, new JsonDeserializer() { + @Override + public ThreadPoolExecutor deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, + JsonProcessingException { + return null; + } + }); + module.addDeserializer(FixedThreadsExecutorGroup.class, new JsonDeserializer() { + @Override + public FixedThreadsExecutorGroup deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, + JsonProcessingException { + return null; + } + }); + mapper.findAndRegisterModules().registerModule(module); final RheaKVStoreOptions opts = mapper.readValue(in, RheaKVStoreOptions.class); System.out.println(opts); } diff --git a/jraft-test/src/main/java/com/alipay/sofa/jraft/test/atomic/server/AtomicStateMachine.java b/jraft-test/src/main/java/com/alipay/sofa/jraft/test/atomic/server/AtomicStateMachine.java index f4fb1e100..e7add8b5f 100644 --- a/jraft-test/src/main/java/com/alipay/sofa/jraft/test/atomic/server/AtomicStateMachine.java +++ b/jraft-test/src/main/java/com/alipay/sofa/jraft/test/atomic/server/AtomicStateMachine.java @@ -22,7 +22,12 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; + +import com.alipay.sofa.jraft.util.NamedThreadFactory; +import com.alipay.sofa.jraft.util.ThreadPoolUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alipay.sofa.jraft.Closure; @@ -46,13 +51,25 @@ /** * Atomic state machine - * @author boyan (boyan@alibaba-inc.com) * + * @author boyan (boyan@alibaba-inc.com) + *

* 2018-Apr-25 1:47:50 PM */ public class AtomicStateMachine extends StateMachineAdapter { private static final Logger LOG = LoggerFactory.getLogger(AtomicStateMachine.class); + private static ThreadPoolExecutor executor = ThreadPoolUtil + .newBuilder() + .poolName("JRAFT_TEST_EXECUTOR") + .enableMetric(true) + .coreThreads(3) + .maximumThreads(5) + .keepAliveSeconds(60L) + .workQueue(new SynchronousQueue<>()) + .threadFactory( + new NamedThreadFactory( + "JRaft-Test-Executor-", true)).build(); // private final ConcurrentHashMap counters = new ConcurrentHashMap<>(); @@ -154,24 +171,24 @@ public long getValue(final String key) throws KeyNotFoundException { } @Override - public void onSnapshotSave(final SnapshotWriter writer, final Closure done) { - final Map values = new HashMap<>(); - for (final Map.Entry entry : this.counters.entrySet()) { - values.put(entry.getKey(), entry.getValue().get()); - } - Utils.runInThread(() -> { - final AtomicSnapshotFile snapshot = new AtomicSnapshotFile(writer.getPath() + File.separator + "data"); - if (snapshot.save(values)) { - if (writer.addFile("data")) { - done.run(Status.OK()); - } else { - done.run(new Status(RaftError.EIO, "Fail to add file to writer")); + public void onSnapshotSave(final SnapshotWriter writer, final Closure done) { + final Map values = new HashMap<>(); + for (final Map.Entry entry : this.counters.entrySet()) { + values.put(entry.getKey(), entry.getValue().get()); } - } else { - done.run(new Status(RaftError.EIO, "Fail to save counter snapshot %s", snapshot.getPath())); - } - }); - } + executor.submit(() -> { + final AtomicSnapshotFile snapshot = new AtomicSnapshotFile(writer.getPath() + File.separator + "data"); + if (snapshot.save(values)) { + if (writer.addFile("data")) { + done.run(Status.OK()); + } else { + done.run(new Status(RaftError.EIO, "Fail to add file to writer")); + } + } else { + done.run(new Status(RaftError.EIO, "Fail to save counter snapshot %s", snapshot.getPath())); + } + }); + } @Override public void onError(final RaftException e) {