diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/entity/Ballot.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/Quorum.java similarity index 65% rename from jraft-core/src/main/java/com/alipay/sofa/jraft/entity/Ballot.java rename to jraft-core/src/main/java/com/alipay/sofa/jraft/Quorum.java index 5f9f1eb0b..f58240496 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/entity/Ballot.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/Quorum.java @@ -14,21 +14,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.alipay.sofa.jraft.entity; +package com.alipay.sofa.jraft; + +import com.alipay.sofa.jraft.conf.Configuration; +import com.alipay.sofa.jraft.entity.PeerId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; -import com.alipay.sofa.jraft.conf.Configuration; - /** - * A ballot to vote. - * - * @author boyan (boyan@alibaba-inc.com) - * - * 2018-Mar-15 2:29:11 PM + * @author Akai */ -public class Ballot { +public abstract class Quorum { + private static final Logger LOG = LoggerFactory.getLogger(Quorum.class); + + protected final List peers = new ArrayList<>(); + + protected int quorum; + + protected final List oldPeers = new ArrayList<>(); + protected int oldQuorum; public static final class PosHint { int pos0 = -1; // position in current peers @@ -48,41 +55,9 @@ public UnfoundPeerId(PeerId peerId, int index, boolean found) { } } - private final List peers = new ArrayList<>(); - private int quorum; - private final List oldPeers = new ArrayList<>(); - private int oldQuorum; + public abstract boolean init(final Configuration conf, final Configuration oldConf); - /** - * Init the ballot with current conf and old conf. - * - * @param conf current configuration - * @param oldConf old configuration - * @return true if init success - */ - public boolean init(final Configuration conf, final Configuration oldConf) { - this.peers.clear(); - this.oldPeers.clear(); - this.quorum = this.oldQuorum = 0; - int index = 0; - if (conf != null) { - for (final PeerId peer : conf) { - this.peers.add(new UnfoundPeerId(peer, index++, false)); - } - } - - this.quorum = this.peers.size() / 2 + 1; - if (oldConf == null) { - return true; - } - index = 0; - for (final PeerId peer : oldConf) { - this.oldPeers.add(new UnfoundPeerId(peer, index++, false)); - } - - this.oldQuorum = this.oldPeers.size() / 2 + 1; - return true; - } + public abstract void grant(final PeerId peerId); private UnfoundPeerId findPeer(final PeerId peerId, final List peers, final int posHint) { if (posHint < 0 || posHint >= peers.size() || !peers.get(posHint).peerId.equals(peerId)) { @@ -97,6 +72,15 @@ private UnfoundPeerId findPeer(final PeerId peerId, final List pe return peers.get(posHint); } + /** + * Returns true when the ballot is granted. + * + * @return true if the ballot is granted + */ + public boolean isGranted() { + return quorum <= 0 && oldQuorum <= 0; + } + public PosHint grant(final PeerId peerId, final PosHint hint) { UnfoundPeerId peer = findPeer(peerId, this.peers, hint.pos0); if (peer != null) { @@ -125,17 +109,4 @@ public PosHint grant(final PeerId peerId, final PosHint hint) { return hint; } - - public void grant(final PeerId peerId) { - grant(peerId, new PosHint()); - } - - /** - * Returns true when the ballot is granted. - * - * @return true if the ballot is granted - */ - public boolean isGranted() { - return this.quorum <= 0 && this.oldQuorum <= 0; - } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/BallotBox.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/BallotBox.java index d24223beb..d16f6bc2b 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/BallotBox.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/BallotBox.java @@ -20,6 +20,8 @@ import javax.annotation.concurrent.ThreadSafe; +import com.alipay.sofa.jraft.Quorum; +import com.alipay.sofa.jraft.entity.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,8 +30,6 @@ import com.alipay.sofa.jraft.Lifecycle; import com.alipay.sofa.jraft.closure.ClosureQueue; import com.alipay.sofa.jraft.conf.Configuration; -import com.alipay.sofa.jraft.entity.Ballot; -import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.option.BallotBoxOptions; import com.alipay.sofa.jraft.util.Describer; import com.alipay.sofa.jraft.util.OnlyForTest; @@ -38,8 +38,9 @@ /** * Ballot box for voting. - * @author boyan (boyan@alibaba-inc.com) * + * @author boyan (boyan@alibaba-inc.com) + *

* 2018-Apr-04 2:32:10 PM */ @ThreadSafe @@ -52,7 +53,7 @@ public class BallotBox implements Lifecycle, Describer { private final StampedLock stampedLock = new StampedLock(); private long lastCommittedIndex = 0; private long pendingIndex; - private final SegmentList pendingMetaQueue = new SegmentList<>(false); + private final SegmentList pendingMetaQueue = new SegmentList<>(false); private BallotBoxOptions opts; @OnlyForTest @@ -61,7 +62,7 @@ long getPendingIndex() { } @OnlyForTest - SegmentList getPendingMetaQueue() { + SegmentList getPendingMetaQueue() { return this.pendingMetaQueue; } @@ -112,14 +113,15 @@ public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final } final long startAt = Math.max(this.pendingIndex, firstLogIndex); - Ballot.PosHint hint = new Ballot.PosHint(); + Quorum.PosHint hint = new Quorum.PosHint(); for (long logIndex = startAt; logIndex <= lastLogIndex; logIndex++) { - final Ballot bl = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex)); - hint = bl.grant(peer, hint); - if (bl.isGranted()) { + final Quorum quorum = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex)); + hint = quorum.grant(peer, hint); + if (quorum.isGranted()) { lastCommittedIndex = logIndex; } } + if (lastCommittedIndex == 0) { return true; } @@ -164,6 +166,7 @@ public void clearPendingTasks() { * According the the raft algorithm, the logs from previous terms can't be * committed until a log at the new term becomes committed, so * |newPendingIndex| should be |last_log_index| + 1. + * * @param newPendingIndex pending index of new leader * @return returns true if reset success */ @@ -192,14 +195,18 @@ public boolean resetPendingIndex(final long newPendingIndex) { * Called by leader, otherwise the behavior is undefined * Store application context before replication. * - * @param conf current configuration - * @param oldConf old configuration - * @param done callback - * @return returns true on success + * @param conf current configuration + * @param oldConf old configuration + * @param done callback + * @param quorumConfiguration quorum configuration + * @return returns true on success */ - public boolean appendPendingTask(final Configuration conf, final Configuration oldConf, final Closure done) { - final Ballot bl = new Ballot(); - if (!bl.init(conf, oldConf)) { + public boolean appendPendingTask(final Configuration conf, final Configuration oldConf, final Closure done, + final QuorumConfiguration quorumConfiguration) { + final Quorum quorum; + quorum = quorumConfiguration.isEnableNWR() ? new NWRQuorum(quorumConfiguration.getWriteFactor(), + quorumConfiguration.getReadFactor()) : new MajorityQuorum(); + if (!quorum.init(conf, oldConf)) { LOG.error("Fail to init ballot."); return false; } @@ -209,7 +216,7 @@ public boolean appendPendingTask(final Configuration conf, final Configuration o LOG.error("Node {} fail to appendingTask, pendingIndex={}.", this.opts.getNodeId(), this.pendingIndex); return false; } - this.pendingMetaQueue.add(bl); + this.pendingMetaQueue.add(quorum); this.closureQueue.appendPendingClosure(done); return true; } finally { diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index ae1c0df4a..40cd7ef21 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -16,6 +16,8 @@ */ package com.alipay.sofa.jraft.core; +import java.math.BigDecimal; +import java.math.RoundingMode; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -34,19 +36,12 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.stream.Collectors; +import com.alipay.sofa.jraft.*; +import com.alipay.sofa.jraft.entity.*; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.alipay.sofa.jraft.Closure; -import com.alipay.sofa.jraft.FSMCaller; -import com.alipay.sofa.jraft.JRaftServiceFactory; -import com.alipay.sofa.jraft.JRaftUtils; -import com.alipay.sofa.jraft.Node; -import com.alipay.sofa.jraft.NodeManager; -import com.alipay.sofa.jraft.ReadOnlyService; -import com.alipay.sofa.jraft.ReplicatorGroup; -import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.closure.CatchUpClosure; import com.alipay.sofa.jraft.closure.ClosureQueue; import com.alipay.sofa.jraft.closure.ClosureQueueImpl; @@ -55,16 +50,6 @@ import com.alipay.sofa.jraft.conf.Configuration; import com.alipay.sofa.jraft.conf.ConfigurationEntry; import com.alipay.sofa.jraft.conf.ConfigurationManager; -import com.alipay.sofa.jraft.entity.Ballot; -import com.alipay.sofa.jraft.entity.EnumOutter; -import com.alipay.sofa.jraft.entity.LeaderChangeContext; -import com.alipay.sofa.jraft.entity.LogEntry; -import com.alipay.sofa.jraft.entity.LogId; -import com.alipay.sofa.jraft.entity.NodeId; -import com.alipay.sofa.jraft.entity.PeerId; -import com.alipay.sofa.jraft.entity.RaftOutter; -import com.alipay.sofa.jraft.entity.Task; -import com.alipay.sofa.jraft.entity.UserLog; import com.alipay.sofa.jraft.error.LogIndexOutOfBoundsException; import com.alipay.sofa.jraft.error.LogNotFoundException; import com.alipay.sofa.jraft.error.OverloadException; @@ -135,7 +120,7 @@ * The raft replica node implementation. * * @author boyan (boyan@alibaba-inc.com) - * + *

* 2018-Apr-03 4:26:51 PM */ public class NodeImpl implements Node, RaftServerService { @@ -164,7 +149,9 @@ public class NodeImpl implements Node, RaftServerService { public static final AtomicInteger GLOBAL_NUM_NODES = new AtomicInteger( 0); - /** Internal states */ + /** + * Internal states + */ private final ReadWriteLock readWriteLock = new NodeReadWriteLock( this); protected final Lock writeLock = this.readWriteLock @@ -177,16 +164,21 @@ public class NodeImpl implements Node, RaftServerService { private volatile long lastLeaderTimestamp; private PeerId leaderId = new PeerId(); private PeerId votedId; - private final Ballot voteCtx = new Ballot(); - private final Ballot prevVoteCtx = new Ballot(); + private Quorum voteCtx; + private Quorum prevVoteCtx; + private ConfigurationEntry conf; private StopTransferArg stopTransferArg; - /** Raft group and node options and identifier */ + /** + * Raft group and node options and identifier + */ private final String groupId; private NodeOptions options; private RaftOptions raftOptions; private final PeerId serverId; - /** Other services */ + /** + * Other services + */ private final ConfigurationCtx confCtx; private LogStorage logStorage; private RaftMetaStorage metaStorage; @@ -200,7 +192,9 @@ public class NodeImpl implements Node, RaftServerService { private final List shutdownContinuations = new ArrayList<>(); private RaftClientService rpcService; private ReadOnlyService readOnlyService; - /** Timers */ + /** + * Timers + */ private Scheduler timerManager; private RepeatedTimer electionTimer; private RepeatedTimer voteTimer; @@ -208,21 +202,31 @@ public class NodeImpl implements Node, RaftServerService { private RepeatedTimer snapshotTimer; private ScheduledFuture transferTimer; private ThreadId wakingCandidate; - /** Disruptor to run node service */ + /** + * Disruptor to run node service + */ private Disruptor applyDisruptor; private RingBuffer applyQueue; - /** Metrics */ + /** + * Metrics + */ private NodeMetrics metrics; private NodeId nodeId; private JRaftServiceFactory serviceFactory; - /** ReplicatorStateListeners */ + /** + * ReplicatorStateListeners + */ private final CopyOnWriteArrayList replicatorStateListeners = new CopyOnWriteArrayList<>(); - /** Node's target leader election priority value */ + /** + * Node's target leader election priority value + */ private volatile int targetPriority; - /** The number of elections time out for current node */ + /** + * The number of elections time out for current node + */ private volatile int electionTimeoutCounter; private static class NodeReadWriteLock extends LongHeldDetectingReadWriteLock { @@ -256,7 +260,7 @@ public void report(final AcquireMode acquireMode, final Thread heldThread, * Node service event. * * @author boyan (boyan@alibaba-inc.com) - * + *

* 2018-Apr-03 4:29:55 PM */ private static class LogEntryAndClosure { @@ -285,7 +289,7 @@ public LogEntryAndClosure newInstance() { * Event handler. * * @author boyan (boyan@alibaba-inc.com) - * + *

* 2018-Apr-03 4:30:07 PM */ private class LogEntryAndClosureHandler implements EventHandler { @@ -325,7 +329,7 @@ private void reset() { * Configuration commit context. * * @author boyan (boyan@alibaba-inc.com) - * + *

* 2018-Apr-03 4:29:38 PM */ private static class ConfigurationCtx { @@ -707,7 +711,6 @@ private void decayTargetPriority() { * then compute and update the target priority value. * * @param inLock whether the writeLock has already been locked in other place. - * */ private void checkAndSetConfiguration(final boolean inLock) { if (!inLock) { @@ -738,7 +741,6 @@ private void checkAndSetConfiguration(final boolean inLock) { * Get max priority value for all nodes in the same Raft group, and update current node's target priority value. * * @param peerIds peer nodes in the same Raft group - * */ private int getMaxPriorityOfNodes(final List peerIds) { Requires.requireNonNull(peerIds, "Null peer list"); @@ -887,6 +889,31 @@ private int randomTimeout(final int timeoutMs) { return ThreadLocalRandom.current().nextInt(timeoutMs, timeoutMs + this.raftOptions.getMaxElectionDelayMs()); } + private boolean checkAndResetFactor(Integer writeFactor, Integer readFactor) { + if (Objects.nonNull(readFactor) && Objects.nonNull(writeFactor)) { + if (readFactor + writeFactor != 10) { + LOG.error("The sum of readFactor and writeFactor should be 10"); + return false; + } + return true; + } + if (Objects.nonNull(readFactor)) { + if (readFactor > 0 && readFactor < 10) { + options.setWriteQuorumFactor(10 - readFactor); + return true; + } + LOG.error("Fail to set quorum_nwr read_factor because {} is not between (0,10)", readFactor); + } + if (Objects.nonNull(writeFactor)) { + if (writeFactor > 0 && writeFactor < 10) { + options.setReadQuorumFactor(10 - writeFactor); + return true; + } + LOG.error("Fail to set quorum_nwr write_factor because {} is not between (0,10)", writeFactor); + } + return false; + } + @Override public boolean init(final NodeOptions opts) { Requires.requireNonNull(opts, "Null node options"); @@ -899,6 +926,11 @@ public boolean init(final NodeOptions opts) { this.serverId.setPriority(opts.getElectionPriority()); this.electionTimeoutCounter = 0; + if (options.isEnableFlexibleRaft() + && !checkAndResetFactor(options.getWriteQuorumFactor(), options.getReadQuorumFactor())) { + return false; + } + if (this.serverId.getIp().equals(Utils.IP_ANY)) { LOG.error("Node can't started from IP_ANY."); return false; @@ -1013,6 +1045,12 @@ protected int adjustTimeout(final int timeoutMs) { LOG.error("Node {} initFSMCaller failed.", getNodeId()); return false; } + + prevVoteCtx = options.isEnableFlexibleRaft() ? new NWRQuorum(opts.getReadQuorumFactor(), + opts.getWriteQuorumFactor()) : new MajorityQuorum(); + voteCtx = options.isEnableFlexibleRaft() ? new NWRQuorum(opts.getReadQuorumFactor(), + opts.getWriteQuorumFactor()) : new MajorityQuorum(); + this.ballotBox = new BallotBox(); final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions(); ballotBoxOpts.setWaiter(this.fsmCaller); @@ -1147,7 +1185,8 @@ private void electSelf() { this.votedId = this.serverId.copy(); LOG.debug("Node {} start vote timer, term={} .", getNodeId(), this.currTerm); this.voteTimer.start(); - this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf()); + + voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf()); oldTerm = this.currTerm; } finally { this.writeLock.unlock(); @@ -1184,8 +1223,8 @@ private void electSelf() { } this.metaStorage.setTermAndVotedFor(this.currTerm, this.serverId); - this.voteCtx.grant(this.serverId); - if (this.voteCtx.isGranted()) { + voteCtx.grant(this.serverId); + if (voteCtx.isGranted()) { becomeLeader(); } } finally { @@ -1385,17 +1424,20 @@ private void executeApplyingTasks(final List tasks) { final LogEntryAndClosure task = tasks.get(i); if (task.expectedTerm != -1 && task.expectedTerm != this.currTerm) { LOG.debug("Node {} can't apply task whose expectedTerm={} doesn't match currTerm={}.", getNodeId(), - task.expectedTerm, this.currTerm); + task.expectedTerm, this.currTerm); if (task.done != null) { final Status st = new Status(RaftError.EPERM, "expected_term=%d doesn't match current_term=%d", - task.expectedTerm, this.currTerm); + task.expectedTerm, this.currTerm); ThreadPoolsFactory.runClosureInThread(this.groupId, task.done, st); task.reset(); } continue; } + if (!this.ballotBox.appendPendingTask(this.conf.getConf(), - this.conf.isStable() ? null : this.conf.getOldConf(), task.done)) { + this.conf.isStable() ? null : this.conf.getOldConf(), task.done,options.isEnableFlexibleRaft() ? + QuorumFactory.createNWRQuorumConfiguration(options.getWriteQuorumFactor(), options.getReadQuorumFactor()): + QuorumFactory.createMajorityQuorumConfiguration())) { ThreadPoolsFactory.runClosureInThread(this.groupId, task.done, new Status(RaftError.EINTERNAL, "Fail to append task.")); task.reset(); continue; @@ -1426,8 +1468,9 @@ public NodeMetrics getNodeMetrics() { /** * Returns the JRaft service factory for current node. - * @since 1.2.6 + * * @return the service factory + * @since 1.2.6 */ public JRaftServiceFactory getServiceFactory() { return this.serviceFactory; @@ -1451,6 +1494,7 @@ public void readIndex(ReadOnlyOption readOnlyOptions, byte[] requestContext, Rea /** * ReadIndex response closure + * * @author dennis */ private class ReadIndexHeartbeatResponseClosure extends RpcResponseClosureAdapter { @@ -1469,7 +1513,8 @@ public ReadIndexHeartbeatResponseClosure(final RpcResponseClosure closure) { @@ -1552,7 +1603,7 @@ private void readFollower(final ReadIndexRequest request, final RpcResponseClosu private void readLeader(final ReadIndexRequest request, final ReadIndexResponse.Builder respBuilder, final RpcResponseClosure closure) { - final int quorum = getQuorum(); + final int quorum = getReadQuorum(); if (quorum <= 1) { // Only one peer, fast path. respBuilder.setSuccess(true) // @@ -1574,7 +1625,7 @@ private void readLeader(final ReadIndexRequest request, final ReadIndexResponse. } respBuilder.setIndex(lastCommittedIndex); - if (request.getPeerId() != null) { + if (Objects.nonNull(request.getPeerId())) { // request from follower or learner, check if the follower/learner is in current conf. final PeerId peer = new PeerId(); peer.parse(request.getServerId()); @@ -1627,29 +1678,29 @@ public void apply(final Task task) { entry.setData(task.getData()); final EventTranslator translator = (event, sequence) -> { - event.reset(); - event.done = task.getDone(); - event.entry = entry; - event.expectedTerm = task.getExpectedTerm(); + event.reset(); + event.done = task.getDone(); + event.entry = entry; + event.expectedTerm = task.getExpectedTerm(); }; - switch(this.options.getApplyTaskMode()) { - case Blocking: - this.applyQueue.publishEvent(translator); - break; - case NonBlocking: - default: - if (!this.applyQueue.tryPublishEvent(translator)) { - String errorMsg = "Node is busy, has too many tasks, queue is full and bufferSize="+ this.applyQueue.getBufferSize(); - ThreadPoolsFactory.runClosureInThread(this.groupId, task.getDone(), - new Status(RaftError.EBUSY, errorMsg)); - LOG.warn("Node {} applyQueue is overload.", getNodeId()); - this.metrics.recordTimes("apply-task-overload-times", 1); - if(task.getDone() == null) { - throw new OverloadException(errorMsg); - } - } - break; + switch (this.options.getApplyTaskMode()) { + case Blocking: + this.applyQueue.publishEvent(translator); + break; + case NonBlocking: + default: + if (!this.applyQueue.tryPublishEvent(translator)) { + String errorMsg = "Node is busy, has too many tasks, queue is full and bufferSize=" + this.applyQueue.getBufferSize(); + ThreadPoolsFactory.runClosureInThread(this.groupId, task.getDone(), + new Status(RaftError.EBUSY, errorMsg)); + LOG.warn("Node {} applyQueue is overload.", getNodeId()); + this.metrics.recordTimes("apply-task-overload-times", 1); + if (task.getDone() == null) { + throw new OverloadException(errorMsg); + } + } + break; } } @@ -2146,8 +2197,9 @@ void increaseTermTo(final long newTerm, final Status status) { /** * Peer catch up callback - * @author boyan (boyan@alibaba-inc.com) * + * @author boyan (boyan@alibaba-inc.com) + *

* 2018-Apr-11 2:10:02 PM */ private static class OnCaughtUp extends CatchUpClosure { @@ -2220,8 +2272,10 @@ private boolean checkDeadNodes(final Configuration conf, final long monotonicNow LOG.warn("Node {} steps down when alive nodes don't satisfy quorum, term={}, deadNodes={}, conf={}.", getNodeId(), this.currTerm, deadNodes, conf); final Status status = new Status(); - status.setError(RaftError.ERAFTTIMEDOUT, "Majority of the group dies: %d/%d", deadNodes.size(), - peers.size()); + String msg = options.isEnableFlexibleRaft() ? "Reading quorum does not meet availability conditions: " + + getReadQuorum() + ", Some nodes in the cluster dies" + : "Majority of the group dies"; + status.setError(RaftError.ERAFTTIMEDOUT, "%s: %d/%d", msg, deadNodes.size(), peers.size()); stepDown(this.currTerm, false, status); } return false; @@ -2252,7 +2306,8 @@ private boolean checkDeadNodes0(final List peers, final long monotonicNo deadNodes.addPeer(peer); } } - if (aliveCount >= peers.size() / 2 + 1) { + + if (aliveCount >= getReadQuorum()) { updateLastLeaderTimestamp(startLease); return true; } @@ -2320,7 +2375,7 @@ private void handleStepDownTimeout() { * Configuration changed callback. * * @author boyan (boyan@alibaba-inc.com) - * + *

* 2018-Apr-11 2:53:43 PM */ private class ConfigurationChangeDone implements Closure { @@ -2359,7 +2414,12 @@ private void unsafeApplyConfiguration(final Configuration newConf, final Configu } final ConfigurationChangeDone configurationChangeDone = new ConfigurationChangeDone(this.currTerm, leaderStart); // Use the new_conf to deal the quorum of this very log - if (!this.ballotBox.appendPendingTask(newConf, oldConf, configurationChangeDone)) { + if (!this.ballotBox.appendPendingTask( + newConf, + oldConf, + configurationChangeDone, + options.isEnableFlexibleRaft() ? QuorumFactory.createNWRQuorumConfiguration(options.getWriteQuorumFactor(), + options.getReadQuorumFactor()) : QuorumFactory.createMajorityQuorumConfiguration())) { ThreadPoolsFactory.runClosureInThread(this.groupId, configurationChangeDone, new Status( RaftError.EINTERNAL, "Fail to append task.")); return; @@ -2561,8 +2621,8 @@ public void handleRequestVoteResponse(final PeerId peerId, final long term, fina } // check granted quorum? if (response.getGranted()) { - this.voteCtx.grant(peerId); - if (this.voteCtx.isGranted()) { + voteCtx.grant(peerId); + if (voteCtx.isGranted()) { becomeLeader(); } } @@ -2623,8 +2683,8 @@ public void handlePreVoteResponse(final PeerId peerId, final long term, final Re response.getTerm(), response.getGranted()); // check granted quorum? if (response.getGranted()) { - this.prevVoteCtx.grant(peerId); - if (this.prevVoteCtx.isGranted()) { + prevVoteCtx.grant(peerId); + if (prevVoteCtx.isGranted()) { doUnlock = false; electSelf(); } @@ -2691,7 +2751,7 @@ private void preVote() { LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm); return; } - this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf()); + prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf()); for (final PeerId peer : this.conf.listPeers()) { if (peer.equals(this.serverId)) { continue; @@ -2712,8 +2772,8 @@ private void preVote() { .build(); this.rpcService.preVote(peer.getEndpoint(), done.request, done); } - this.prevVoteCtx.grant(this.serverId); - if (this.prevVoteCtx.isGranted()) { + prevVoteCtx.grant(this.serverId); + if (prevVoteCtx.isGranted()) { doUnlock = false; electSelf(); } @@ -2806,7 +2866,7 @@ public void shutdown(Closure done) { final CountDownLatch latch = new CountDownLatch(1); this.shutdownLatch = latch; ThreadPoolsFactory.runInThread(this.groupId, - () -> this.applyQueue.publishEvent((event, sequence) -> event.shutdownLatch = latch)); + () -> this.applyQueue.publishEvent((event, sequence) -> event.shutdownLatch = latch)); } else { final int num = GLOBAL_NUM_NODES.decrementAndGet(); LOG.info("The number of active nodes decrement to {}.", num); @@ -2833,18 +2893,18 @@ public void shutdown(Closure done) { // Call join() asynchronously final Closure shutdownHook = done; ThreadPoolsFactory.runInThread(this.groupId, () -> { - try { - join(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - // This node is down, it's ok to invoke done right now. Don't invoke this - // in place to avoid the dead writeLock issue when done.Run() is going to acquire - // a writeLock which is already held by the caller - if (shutdownHook != null) { - shutdownHook.run(Status.OK()); + try { + join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + // This node is down, it's ok to invoke done right now. Don't invoke this + // in place to avoid the dead writeLock issue when done.Run() is going to acquire + // a writeLock which is already held by the caller + if (shutdownHook != null) { + shutdownHook.run(Status.OK()); + } } - } }); } @@ -2943,7 +3003,6 @@ private void onTransferTimeout(final StopTransferArg arg) { * retrieve cluster peers info, you should use {@link #listPeers()} instead. * * @return current configuration. - * * @since 1.0.3 */ public Configuration getCurrentConf() { @@ -3194,7 +3253,7 @@ public Status transferLeadershipTo(final PeerId peer) { try { if (this.state != State.STATE_LEADER) { LOG.warn("Node {} can't transfer leadership to peer {} as it is in state {}.", getNodeId(), peer, - this.state); + this.state); return new Status(this.state == State.STATE_TRANSFERRING ? RaftError.EBUSY : RaftError.EPERM, "Not a leader"); } @@ -3212,8 +3271,8 @@ public Status transferLeadershipTo(final PeerId peer) { // completed so that the peer's configuration is up-to-date when it // receives the TimeOutNowRequest. LOG.warn( - "Node {} refused to transfer leadership to peer {} when the leader is changing the configuration.", - getNodeId(), peer); + "Node {} refused to transfer leadership to peer {} when the leader is changing the configuration.", + getNodeId(), peer); return new Status(RaftError.EBUSY, "Changing the configuration"); } @@ -3232,7 +3291,7 @@ public Status transferLeadershipTo(final PeerId peer) { } if (!this.conf.contains(peerId)) { LOG.info("Node {} refused to transfer leadership to peer {} as it is not in {}.", getNodeId(), peer, - this.conf); + this.conf); return new Status(RaftError.EINVAL, "Not in current configuration"); } @@ -3243,13 +3302,13 @@ public Status transferLeadershipTo(final PeerId peer) { } this.state = State.STATE_TRANSFERRING; final Status status = new Status(RaftError.ETRANSFERLEADERSHIP, - "Raft leader is transferring leadership to %s", peerId); + "Raft leader is transferring leadership to %s", peerId); onLeaderStop(status); LOG.info("Node {} starts to transfer leadership to peer {}.", getNodeId(), peer); final StopTransferArg stopArg = new StopTransferArg(this, this.currTerm, peerId); this.stopTransferArg = stopArg; this.transferTimer = this.timerManager.schedule(() -> onTransferTimeout(stopArg), - this.options.getElectionTimeoutMs(), TimeUnit.MILLISECONDS); + this.options.getElectionTimeoutMs(), TimeUnit.MILLISECONDS); } finally { this.writeLock.unlock(); diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/entity/MajorityQuorum.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/entity/MajorityQuorum.java new file mode 100644 index 000000000..15e4980cf --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/entity/MajorityQuorum.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.entity; + +import com.alipay.sofa.jraft.Quorum; +import com.alipay.sofa.jraft.conf.Configuration; + +/** + * A ballot to vote. + * + * @author boyan (boyan@alibaba-inc.com) + *

+ * 2018-Mar-15 2:29:11 PM + */ +public class MajorityQuorum extends Quorum { + + /** + * Init the ballot with current conf and old conf. + * + * @param conf current configuration + * @param oldConf old configuration + * @return true if init success + */ + @Override + public boolean init(final Configuration conf, final Configuration oldConf) { + peers.clear(); + oldPeers.clear(); + quorum = oldQuorum = 0; + + int index = 0; + if (conf != null) { + for (final PeerId peer : conf) { + peers.add(new UnfoundPeerId(peer, index++, false)); + } + } + + quorum = peers.size() / 2 + 1; + if (oldConf == null) { + return true; + } + + index = 0; + for (final PeerId peer : oldConf) { + oldPeers.add(new UnfoundPeerId(peer, index++, false)); + } + + oldQuorum = oldPeers.size() / 2 + 1; + return true; + } + + @Override + public void grant(final PeerId peerId) { + super.grant(peerId, new PosHint()); + } + +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/entity/NWRQuorum.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/entity/NWRQuorum.java new file mode 100644 index 000000000..77dd869ef --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/entity/NWRQuorum.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.entity; + +import com.alipay.sofa.jraft.Quorum; +import com.alipay.sofa.jraft.conf.Configuration; + +import java.math.BigDecimal; +import java.math.RoundingMode; + +/** + * @author Akai + */ +public class NWRQuorum extends Quorum { + + protected Integer readFactor; + protected Integer writeFactor; + private static final String defaultDecimalFactor = "0.1"; + private static final BigDecimal defaultDecimal = new BigDecimal(defaultDecimalFactor); + + public NWRQuorum(Integer writeFactor, Integer readFactor) { + this.writeFactor = writeFactor; + this.readFactor = readFactor; + } + + @Override + public boolean init(Configuration conf, Configuration oldConf) { + peers.clear(); + oldPeers.clear(); + quorum = oldQuorum = 0; + int index = 0; + + if (conf != null) { + for (final PeerId peer : conf) { + peers.add(new UnfoundPeerId(peer, index++, false)); + } + } + + BigDecimal writeFactorDecimal = defaultDecimal.multiply(new BigDecimal(writeFactor)).multiply( + new BigDecimal(peers.size())); + quorum = writeFactorDecimal.setScale(0, RoundingMode.CEILING).intValue(); + + if (oldConf == null) { + return true; + } + index = 0; + for (final PeerId peer : oldConf) { + oldPeers.add(new UnfoundPeerId(peer, index++, false)); + } + + BigDecimal writeFactorOldDecimal = defaultDecimal.multiply(new BigDecimal(writeFactor)).multiply( + new BigDecimal(oldPeers.size())); + oldQuorum = writeFactorOldDecimal.setScale(0, RoundingMode.CEILING).intValue(); + return true; + } + + @Override + public void grant(final PeerId peerId) { + super.grant(peerId, new Quorum.PosHint()); + } + +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/entity/QuorumConfiguration.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/entity/QuorumConfiguration.java new file mode 100644 index 000000000..c27f83c04 --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/entity/QuorumConfiguration.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.entity; + +/** + * @author Akai + */ +public class QuorumConfiguration { + private Integer readFactor; + private Integer writeFactor; + private boolean isEnableNWR; + + public Integer getReadFactor() { + return readFactor; + } + + public void setReadFactor(Integer readFactor) { + this.readFactor = readFactor; + } + + public Integer getWriteFactor() { + return writeFactor; + } + + public void setWriteFactor(Integer writeFactor) { + this.writeFactor = writeFactor; + } + + public boolean isEnableNWR() { + return isEnableNWR; + } + + public void setEnableNWR(boolean enableNWR) { + isEnableNWR = enableNWR; + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/entity/QuorumFactory.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/entity/QuorumFactory.java new file mode 100644 index 000000000..cc43ab591 --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/entity/QuorumFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.entity; + +/** + * @author Akai + */ +public final class QuorumFactory { + public static QuorumConfiguration createNWRQuorumConfiguration(Integer writeFactor, Integer readFactor) { + boolean isEnableNWR = true; + QuorumConfiguration quorumConfiguration = new QuorumConfiguration(); + quorumConfiguration.setReadFactor(readFactor); + quorumConfiguration.setWriteFactor(writeFactor); + quorumConfiguration.setEnableNWR(isEnableNWR); + return quorumConfiguration; + } + + public static QuorumConfiguration createMajorityQuorumConfiguration() { + boolean isEnableNWR = false; + QuorumConfiguration quorumConfiguration = new QuorumConfiguration(); + quorumConfiguration.setEnableNWR(isEnableNWR); + return quorumConfiguration; + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java index 81ef572dc..21cc83503 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java @@ -166,6 +166,18 @@ public class NodeOptions extends RpcOptions implements Copiable { */ private boolean sharedSnapshotTimer = false; + /** + * Read Quorum's factor + */ + private Integer readQuorumFactor; + /** + * Write Quorum's factor + */ + private Integer writeQuorumFactor; + /** + * Enable NWRMode or Not + */ + private boolean enableFlexibleRaft = false; /** * Custom service factory. */ @@ -425,6 +437,32 @@ public void setSharedSnapshotTimer(final boolean sharedSnapshotTimer) { this.sharedSnapshotTimer = sharedSnapshotTimer; } + public Integer getReadQuorumFactor() { + return readQuorumFactor; + } + + public void setReadQuorumFactor(int readQuorumFactor) { + this.readQuorumFactor = readQuorumFactor; + enableFlexibleRaft(); + } + + public Integer getWriteQuorumFactor() { + return writeQuorumFactor; + } + + public void setWriteQuorumFactor(int writeQuorumFactor) { + this.writeQuorumFactor = writeQuorumFactor; + enableFlexibleRaft(); + } + + public boolean isEnableFlexibleRaft() { + return enableFlexibleRaft; + } + + private void enableFlexibleRaft() { + this.enableFlexibleRaft = true; + } + @Override public NodeOptions copy() { final NodeOptions nodeOptions = new NodeOptions(); @@ -453,7 +491,11 @@ public NodeOptions copy() { nodeOptions.setRpcProcessorThreadPoolSize(super.getRpcProcessorThreadPoolSize()); nodeOptions.setEnableRpcChecksum(super.isEnableRpcChecksum()); nodeOptions.setMetricRegistry(super.getMetricRegistry()); - + if (nodeOptions.isEnableFlexibleRaft()) { + nodeOptions.enableFlexibleRaft(); + nodeOptions.setWriteQuorumFactor(this.writeQuorumFactor); + nodeOptions.setReadQuorumFactor(this.readQuorumFactor); + } return nodeOptions; } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/BallotBoxTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/BallotBoxTest.java index e5fccfc7c..6ec00748b 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/BallotBoxTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/BallotBoxTest.java @@ -16,6 +16,7 @@ */ package com.alipay.sofa.jraft.core; +import com.alipay.sofa.jraft.entity.QuorumFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -81,7 +82,7 @@ public void testAppendPendingTask() { public void run(Status status) { } - })); + }, QuorumFactory.createMajorityQuorumConfiguration())); assertTrue(box.resetPendingIndex(1)); assertTrue(this.box.appendPendingTask( JRaftUtils.getConfiguration("localhost:8081,localhost:8082,localhost:8083"), @@ -91,7 +92,7 @@ public void run(Status status) { public void run(Status status) { } - })); + }, QuorumFactory.createMajorityQuorumConfiguration())); assertEquals(1, this.box.getPendingMetaQueue().size()); assertEquals(1, this.closureQueue.getQueue().size()); @@ -118,7 +119,7 @@ public void testCommitAt() { public void run(Status status) { } - })); + }, QuorumFactory.createMajorityQuorumConfiguration())); assertEquals(0, this.box.getLastCommittedIndex()); try { this.box.commitAt(1, 3, new PeerId("localhost", 8081)); diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java index 213bb10d0..e68b9df16 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java @@ -3527,8 +3527,8 @@ public void testChangePeersChaosApplyTasks() throws Exception { final Node leader = cluster.getLeader(); leader.changePeers(new Configuration(peers), done); try { - Status status = done.await(); - assertTrue(status.getErrorMsg(), status.isOk()); + Status status = done.await(); + assertTrue(status.getErrorMsg(), status.isOk()); cluster.ensureSame(); assertEquals(10, cluster.getFsms().size()); for (final MockStateMachine fsm : cluster.getFsms()) { diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/BallotTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/MajorityQuorumTest.java similarity index 67% rename from jraft-core/src/test/java/com/alipay/sofa/jraft/entity/BallotTest.java rename to jraft-core/src/test/java/com/alipay/sofa/jraft/entity/MajorityQuorumTest.java index f82388baa..a4888c1be 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/BallotTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/MajorityQuorumTest.java @@ -24,28 +24,28 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -public class BallotTest { +public class MajorityQuorumTest { - private Ballot ballot; + private MajorityQuorum majorityQuorum; @Before public void setup() { - this.ballot = new Ballot(); - this.ballot.init(JRaftUtils.getConfiguration("localhost:8081,localhost:8082,localhost:8083"), null); + this.majorityQuorum = new MajorityQuorum(); + this.majorityQuorum.init(JRaftUtils.getConfiguration("localhost:8081,localhost:8082,localhost:8083"), null); } @Test public void testGrant() { PeerId peer1 = new PeerId("localhost", 8081); - this.ballot.grant(peer1); - assertFalse(this.ballot.isGranted()); + this.majorityQuorum.grant(peer1); + assertFalse(this.majorityQuorum.isGranted()); - PeerId unfoundPeer = new PeerId("localhost", 8084); - this.ballot.grant(unfoundPeer); - assertFalse(this.ballot.isGranted()); + PeerId unFoundPeer = new PeerId("localhost", 8084); + this.majorityQuorum.grant(unFoundPeer); + assertFalse(this.majorityQuorum.isGranted()); PeerId peer2 = new PeerId("localhost", 8082); - this.ballot.grant(peer2); - assertTrue(this.ballot.isGranted()); + this.majorityQuorum.grant(peer2); + assertTrue(this.majorityQuorum.isGranted()); } } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/NWRQuorumTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/NWRQuorumTest.java new file mode 100644 index 000000000..cf1bbca90 --- /dev/null +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/NWRQuorumTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.entity; + +import com.alipay.sofa.jraft.JRaftUtils; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author Akai + */ +public class NWRQuorumTest { + private NWRQuorum nwrQuorum; + private final Integer readFactor = 4; + private final Integer writeFactor = 6; + + @Before + public void setup() { + this.nwrQuorum = new NWRQuorum(writeFactor, readFactor); + this.nwrQuorum.init(JRaftUtils.getConfiguration("localhost:8081,localhost:8082,localhost:8083," + + "localhost:8084,localhost:8085"), null); + + } + + @Test + public void testGrant() { + PeerId peer1 = new PeerId("localhost", 8081); + this.nwrQuorum.grant(peer1); + assertFalse(this.nwrQuorum.isGranted()); + + PeerId peer2 = new PeerId("localhost", 8082); + this.nwrQuorum.grant(peer2); + assertFalse(this.nwrQuorum.isGranted()); + + PeerId unfoundPeer = new PeerId("localhost", 8086); + this.nwrQuorum.grant(unfoundPeer); + assertFalse(this.nwrQuorum.isGranted()); + + PeerId peer3 = new PeerId("localhost", 8083); + this.nwrQuorum.grant(peer3); + assertTrue(this.nwrQuorum.isGranted()); + } +} diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/option/NodeOptionsTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/option/NodeOptionsTest.java index ab603417e..5c47b986e 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/option/NodeOptionsTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/option/NodeOptionsTest.java @@ -51,5 +51,6 @@ public void testCopyRpcOptionsRight() { assertEquals(90, copy.getRpcProcessorThreadPoolSize()); assertTrue(copy.isEnableRpcChecksum()); assertNotNull(copy.getMetricRegistry()); + assertFalse(copy.isEnableFlexibleRaft()); } } \ No newline at end of file diff --git a/rfcs/0002-Flexible-Raft-with-NWR.md b/rfcs/0002-Flexible-Raft-with-NWR.md new file mode 100644 index 000000000..e8dc7962f --- /dev/null +++ b/rfcs/0002-Flexible-Raft-with-NWR.md @@ -0,0 +1,546 @@ +- Feature Name: Implementing Flexible Raft with NWR +- Author: yzw 1294566108@qq.com +- Date: 2023-06-28 +- RFC PR: # +- Issue Link: https://github.com/sofastack/sofa-jraft/issues/1003 + +## Table of Contents + +* [Summary](#Summary) +* [Motivation](#Motivation) +* [Key Design](#Key-design) +* [Detailed Design](#Detailed-design) + +## Summary + +我们希望在原始RAFT算法的基础上,让Leader选举与日志复制除了有多数派确认模型的支持之外,还可以接入NWRQuorum模型,用于动态调整一致性强度。 + +## Motivation + +在原始的RAFT算法中,Leader选举和日志复制都需要获得多数派成员的支持。而NWR模型则可以在动态调整一致性强度的场景中使用,它需要满足W+R>N,以保证强一致性。 +JRaft将RAFT和NWR结合起来,使得用户可以根据不同的业务需求来动态调整Quorum的数量。例如,在一个写多读少的场景中,用户可以将多数派的数量从3调整为2,以降低达成共识的条件,从而提高写请求的效率。同时,为了保证RAFT的正确性,写Quorum的调整需要付出代价,即读Quorum的数量也需要相应调整。 +JRaft支持成员变更,因此用户可以配置(0,1]范围内的小数来计算W和R的具体值。通过使用JRaft,用户可以根据自己的业务需求来灵活地调整一致性强度,使得分布式系统在不同场景下都可以获得最佳的性能和正确性。 + +## Key design + +下图为NWR模型Quroum的设计思路: + +- 抽离出抽象父类Quorum作为MajorityQuorum(多数派确认模型,原Ballot类)和NWRQuorum(NWR模型)的模板。 +- 用户首先需要在NodeOptions类中决定是否开启NWR模式,默认为多数派模型,用户手动设置读写Factor因子后,则视为开启了NWR模式。 +- NodeImpl#init方法进行逻辑处理,判断是否开启了NWR模式,对读写因子进行校验与同步,然后构造对应的Quorum选票实例,例如MajorityQuorum与NWRQuorum。 +- 在构建好选票实例之后,调用对应的方法可以进行选票的初始化(init)、投票(grant)等操作。 + +![](https://img2023.cnblogs.com/blog/2784327/202307/2784327-20230701144543736-883375762.png) + +该项目涉及代码变更的地方可以划分为如下四个模块: + +- **Leader选举模块:** 一个节点想要成为leader,会经过以下几个阶段:预投票、正式投票、当选leader。所以对于preVote、electSelf、becomeLeader等与多数派模型相关的方法都会涉及NWR模型的有关代码变更。 +- **日志复制模块:** 当leader收到客户端的事务请求或者follower与leader数据存在差距时,会调用 **Replicator#sendEntries** 去发送日志复制消息(事务消息);而心跳消息和探测消息,则是由 **Replicator#sendEmptyEntries** 发送的。日志复制中,**NodeImpl#executeApplyingTasks 和** **NodeImpl#unsafeApplyConfiguration** 方法会涉及到多数派确认。在执行这些方法的时候,都会使用 **BallotBox#appendPendingTask** 方法来构造一个待投票的Ballot(现在叫MajorityQuorum/NWRQuorum)并放置到投票箱中。 +- **一致性读模块:** 对于一致性读模块,在raft共识算法中,读取R个节点其实体现在R个节点的心跳响应。通过R个节点的心跳,能保证这个节点一定是leader,一定拥有最新的数据,我们并不是真正需要从R个节点里面读取数据。**NodeImpl#ReadIndexHeartbeatResponseClosure** 这样的方法,可以看到执行了心跳消息的多数派确认模型的逻辑,ReadIndexHeartbeatResponseClosure构造器里面传入了quorum的值,这里我们需要对应修改为NWR模型的逻辑。 +- **成员变更模块:** 对于JRaft成员变更来讲,核心逻辑是采用单成员变更的方式,即使需要同时变更多个成员时,也是会先整理出新add与新remove的成员,再逐个进行单成员变更。其核心方法 **addPeer、removePeer、changePeers、resetPeers** 等等都会涉及NWR模型的适配。另外对于stepDownTimer计时器,它会处理那些下线的节点,对于stepDown而言的多数派逻辑也需要修改。 + +![](https://img2023.cnblogs.com/blog/2784327/202307/2784327-20230701144554871-1252032815.png) +## Detailed Design + +### NodeOptions + +在**NodeOptions类**中,我们新增了如下三个参数:readQuorumFactor、writeQuorumFactor与enableFlexibleRaft,分别表示读因子、写因子以及是否开启NWR模型(true),默认不开启,表示多数派确认模型(false) + +``` + /** + * Read Quorum's factor + */ + private Integer readQuorumFactor; + /** + * Write Quorum's factor + */ + private Integer writeQuorumFactor; + /** + * Enable NWRMode or Not + */ + private boolean enableFlexibleRaft = false; +``` + +对于readQuorumFactor和writeQuorumFactor两个属性,在NodeOptions类里提供了setter和getter方法便于用户自定义配置。对于enableFlexibleRaft属性,提供了isEnableFlexibleRaft()来判断是否开启NWR模型,而enableFlexibleRaft()方法表示开启NWR模式。 + +``` + public Integer getReadQuorumFactor() { + return readQuorumFactor; + } + + public void setReadQuorumFactor(int readQuorumFactor) { + this.readQuorumFactor = readQuorumFactor; + enableFlexibleRaft(); + } + + public Integer getWriteQuorumFactor() { + return writeQuorumFactor; + } + + public void setWriteQuorumFactor(int writeQuorumFactor) { + this.writeQuorumFactor = writeQuorumFactor; + enableFlexibleRaft(); + } + + public boolean isEnableFlexibleRaft() { + return enableFlexibleRaft; + } + + private void enableFlexibleRaft() { + this.enableFlexibleRaft = true; + } +``` + +### Node Init + +在**NodeImpl#init**时,我们首先会对NodeOptions内的readFactor和writeFactor进行校验并且进行参数同步,如果用户只设置了readFactor和writeFactor两个参数的其中之一,那么我们需要同步这两个参数的值。 +在init方法初始化node时,会首先对NWR模式下的factor进行校验与同步。 + +``` +if(options.isEnableFlexibleRaft() && !checkAndResetFactor(options.getWriteQuorumFactor(), + options.getReadQuorumFactor())){ + return false; +} +``` + +校验与同步逻辑在方法checkAndResetFactor里: + +``` + private boolean checkAndResetFactor(Integer writeFactor, Integer readFactor){ + if (Objects.nonNull(readFactor) && Objects.nonNull(writeFactor)) { + if (readFactor + writeFactor != 10) { + LOG.error("The sum of readFactor and writeFactor should be 10"); + return false; + } + return true; + } + if (Objects.nonNull(readFactor)) { + if (readFactor > 0 && readFactor < 10) { + options.setWriteQuorumFactor(10 - readFactor); + return true; + } + LOG.error("Fail to set quorum_nwr read_factor because {} is not between (0,10)", readFactor); + } + if (Objects.nonNull(writeFactor)) { + if (writeFactor > 0 && writeFactor < 10) { + options.setReadQuorumFactor(10 - writeFactor); + return true; + } + LOG.error("Fail to set quorum_nwr write_factor because {} is not between (0,10)", writeFactor); + } + return false; + } +``` + +在以往多数派确认模型中,node初始化时生成Ballot对象是通过关键字直接new出来的,如下所示: + +``` +private final Ballot voteCtx = new Ballot(); +private final Ballot prevVoteCtx = new Ballot(); +``` + +添加NWR模型后,我们需要判断,到底是生成MajorityQuorum还是NWRQuorum。所以在对节点进行初始化时(NodeImpl#init),会根据NodeOptions判断是否开启NWR模型,进而构造对应实例。 + +``` +prevVoteCtx = options.isEnableFlexibleRaft() ? new NWRQuorum(opts.getReadQuorumFactor(), opts.getWriteQuorumFactor()) + : new MajorityQuorum(); +voteCtx = options.isEnableFlexibleRaft() ? new NWRQuorum(opts.getReadQuorumFactor(), opts.getWriteQuorumFactor()) + : new MajorityQuorum(); +``` + +### Quorum Detail + +#### Quorum + +Quorum作为NWRQuorum与MajorityQuorum的抽象父类,持有peers、oldPeers、quorum、oldQuorum几个公共属性。 + +``` +protected final List peers = new ArrayList<>() +protected int quorum; +protected final List oldPeers = new ArrayList<>(); +protected int oldQuorum; +``` + +Quorum提供了grant和init两个抽象方法,子类实现该抽象方法的具体业务逻辑。 + +``` +public abstract boolean init(final Configuration conf, final Configuration oldConf); +public abstract void grant(final PeerId peerId); +``` + +Quorum还定义了findPeer、isGranted、grant这三个包含方法体的父类方法。 + +``` +public PosHint grant(final PeerId peerId, final PosHint hint){ + //此处省略方法体 +} +public boolean isGranted() { + //此处省略方法体 +} +private UnfoundPeerId findPeer(final PeerId peerId, final List peers, final int posHint){ + //此处省略方法体 +} +``` + +#### NWRQuorum + +NWRQuorum作为NWR模型选票实现类,持有readFactor、writeFactor等几个属性,他们代表读写因子。 + +``` + protected Integer readFactor; ---读因子 + protected Integer writeFactor; ---写因子 + private static final String defaultDecimalFactor = "0.1"; + private static final BigDecimal defaultDecimal = new BigDecimal(defaultDecimalFactor); +``` + +另外,我们提供了一个NWRQuorum的构造器用于构造NWRQuorum实例,需要传入writeFactor, readFactor两个参数。 + +``` + public NWRQuorum(Double writeFactor, Double readFactor) { + this.writeFactor = writeFactor; + this.readFactor = readFactor; + } +``` + +我们也实现了抽象父类的init与grant方法, + +``` +public boolean init(Configuration conf, Configuration oldConf) ---初始化选票 +public void grant(final PeerId peerId) ---节点投票 +``` + +对于NWRQuorum的init()方法来讲,他对于quorum的计算与以往有所不同,代码如下: + +``` + @Override + public boolean init(Configuration conf, Configuration oldConf) { + peers.clear(); + oldPeers.clear(); + quorum = oldQuorum = 0; + int index = 0; + + if (conf != null) { + for (final PeerId peer : conf) { + peers.add(new UnfoundPeerId(peer, index++, false)); + } + } + + BigDecimal writeFactorDecimal = defaultDecimal.multiply(new BigDecimal(writeFactor)) + .multiply(new BigDecimal(peers.size())); + quorum = writeFactorDecimal.setScale(0, RoundingMode.CEILING).intValue(); + + if (oldConf == null) { + return true; + } + index = 0; + for (final PeerId peer : oldConf) { + oldPeers.add(new UnfoundPeerId(peer, index++, false)); + } + + BigDecimal writeFactorOldDecimal = defaultDecimal.multiply(new BigDecimal(writeFactor)) + .multiply(new BigDecimal(oldPeers.size())); + oldQuorum = writeFactorOldDecimal.setScale(0, RoundingMode.CEILING).intValue(); + return true; + } +``` + +#### MajorityQuorum + +MajorityQuorum实现了Quorum抽象父类的两个方法,init方法初始化选票需要参数,grant方法用于投票。 + +``` +public boolean init(final Configuration conf, final Configuration oldConf) ---初始化选票 +public void grant(final PeerId peerId) ---节点投票 +``` + +### Module Detail + +#### **Leader-election Module** + +一个节点成为leader会经过以下几个阶段:预投票、正式投票、当选leader。 +首先我们来看预投票NodeImpl#preVote()方法,大概经历以下几个过程: + +1. 校验是否可以开启预投票,安装快照或者集群配置不包含本节点都不可以开启预投票。 +2. 初始化预投票-投票箱。 +3. 遍历,给除了本节点之外的所有其他节点发起RequestVoteRequest--RPC请求。 +4. 给自己投票,并判断是否已经达到多数派。 + + +其中**预投票**有以下几处核心代码涉及投票: + +- **在NodeImpl#preVote()中**,调用Quorum#init()方法初始化预投票-投票箱 + +``` +prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf()); +``` + +- **在NodeImpl#preVote()中**,本节点自己投票,在判断投票箱达到多数派后开启正式选举,调用electSelf() + +``` + prevVoteCtx.grant(this.serverId); + if (prevVoteCtx.isGranted()) { + doUnlock = false; + electSelf(); + } +``` + +- **在NodeImpl#handlePreVoteResponse中**,该方法用来处理预投票响应:首先根据响应判断对方节点是否为本节点投票,在判断为true后,Quorum(即prevVoteCtx)调用grant()对该节点进行授权投票。最后通过isGranted()判断是否大多数节点已经确认,如果符合条件,则开启正式选举模式,调用electSelf()方法。 + +``` + // check granted quorum? + if (response.getGranted()) { + prevVoteCtx.grant(peerId); + if (prevVoteCtx.isGranted()) { + doUnlock = false; + electSelf(); + } + } +``` + +- 接下来多数派确认后,执行NodeImpl#electSelf()方法,它做了以下几件事: + +1. 检验当前节点是否存在集群配置里面,不存在不进行选举。 +2. 关闭预选举定时器。 +3. 清空leader,增加任期,修改状态为candidate,votedId设置为当前本节点。 +4. 启动投票定时器voteTimer,因为可能投票失败需要循环发起投票,voteTimer里面会根据当前的CANDIDATE状态调用electSelf进行选举。 +5. 初始化投票箱。 +6. 遍历所有节点,向其他集群节点发送RequestVoteRequest--RPC请求,请求被RequestVoteRequestProcessor处理器处理的。 +7. 如果多数派确认,则调用NodeImpl#becomeLeader晋升为leader。 + +``` + voteCtx.grant(this.serverId); + if (voteCtx.isGranted()) { + becomeLeader(); + } +``` + +- **在NodeImpl#handleRequestVoteResponse中**,该方法用来处理投票请求的响应。只要收到投票的反馈,就会在投票箱中对多数派进行确认,如果已经达成多数派确认的共识,那么本节点就调用NodeImpl#becomeLeader方法成为leader。投票请求处理器NodeImpl#handleRequestVoteResponse方法对选票处理的核心逻辑如下: + +``` + // check granted quorum? + if (response.getGranted()) { + voteCtx.grant(peerId); + if (voteCtx.isGranted()) { + becomeLeader(); + } + } +``` + +- 在多数派确认后,会调用NodeImpl#becomeLeader方法正式被选举为leader: + +1. 首先会停止选举定时器。 +2. 设置当前的状态为leader。 +3. 设值任期。 +4. 遍历所有的节点将节点加入到复制集群中。 +5. 最后将stepDownTimer打开,定时对leader进行校验是不是又半数以上的节点响应当前的leader。 + +#### Log-replication Module + +当leader收到客户端的事务请求或者follower与leader数据存在差距时,会调用**Replicator#sendEntries**去复制日志,日志复制消息属于事务消息;而心跳消息和探测消息,则是由**Replicator#sendEmptyEntries**发送的。 +日志复制的流程如下: + +1. leader将日志项追加到本地日志 +2. leader将日志广播给follower +3. follower追加到本地日志 +4. follower返回执行结果 +5. leader收到多数派响应后提交日志 +6. 返回执行结果给客户端 + 在JRaft中,我阅读了日志复制模块的源码部分,然后总结出下图来直观的反应整个日志复制从leader到follower再回到leader的整个过程,详细的方法调用链路过程如下所示: + +![](https://img2023.cnblogs.com/blog/2784327/202307/2784327-20230701144642277-1586182824.png) +在日志复制中,以下方法会涉及到多数派确认:NodeImpl#executeApplyingTasks 和NodeImpl#unsafeApplyConfiguration,也就是执行应用任务和应用配置变更所使用到的日志复制。在执行这些方法的时候,都会使用BallotBox#appendPendingTask方法来构造一个待投票的Quorum并放置到投票箱中。 + +**场景一:应用任务** + +我们首先分析NodeImpl#executeApplyingTasks方法: + +1. 检查当前节点是否是 Leader 节点。如果节点不是 Leader 节点,则将所有任务的状态设置为错误并执行相应的回调方法;如果节点正在进行领导权转移,则将所有任务的状态设置为繁忙并执行相应的回调方法。 +2. 遍历任务列表,对于每个任务执行以下操作:a. 检查任务的 expectedTerm 是否与当前任期相同,如果不同则将任务的状态设置为错误并执行相应的回调方法。b. 将任务添加到 BallotBox 中。c. 将任务的日志条目信息添加到一个列表中,并将任务重置为默认状态。 +3. 将任务列表中的所有日志条目追加到当前节点的日志中,并将追加操作封装为 LeaderStableClosure 回调方法。 +4. 检查并更新配置信息,如果需要更新则执行相应的更新操作。 + +**注意:在executeApplyingTasks方法中,根据当前节点配置,生成了一个待投票Quorum,并放置到投票箱BallotBox的pendingMetaQueue中。所以我们需要在这里构造待投票Quorum时,修改quorum为NWR模式,而不是之前的多数派。** + +``` + private void executeApplyingTasks(final List tasks) { + // 省略部分代码... + if (!this.ballotBox.appendPendingTask(this.conf.getConf(), + this.conf.isStable() ? null : this.conf.getOldConf(), task.done,options.isEnableFlexibleRaft() ? + QuorumFactory.createNWRQuorumConfiguration(options.getWriteQuorumFactor(), options.getReadQuorumFactor()): + QuorumFactory.createMajorityQuorumConfiguration())) { + ThreadPoolsFactory.runClosureInThread(this.groupId, task.done, new Status(RaftError.EINTERNAL, "Fail to append task.")); + task.reset(); + continue; + } + // 省略部分代码... + this.logManager.appendEntries(entries, new LeaderStableClosure(entries)); + // 省略部分代码... + } +``` + +**场景二:应用配置** + +接下来看看NodeImpl#unsafeApplyConfiguration是如何构建选票Ballot的: + +下面这段代码的作用是将新配置信息封装成一个日志条目,并追加到当前节点的日志中,从而实现配置变更的操作。其实逻辑和增加普通日志类似,主要需要注意的还是ballotBox.appendPendingTask方法,也就是生成一个待投票Quorum的逻辑。 + +``` + private void unsafeApplyConfiguration(final Configuration newConf, final Configuration oldConf, + final boolean leaderStart) { + // 省略部分代码... + if (!this.ballotBox.appendPendingTask(newConf, oldConf, configurationChangeDone,options.isEnableFlexibleRaft() ? + QuorumFactory.createNWRQuorumConfiguration(options.getWriteQuorumFactor(), options.getReadQuorumFactor()): + QuorumFactory.createMajorityQuorumConfiguration())) { + ThreadPoolsFactory.runClosureInThread(this.groupId, configurationChangeDone, new Status( + RaftError.EINTERNAL, "Fail to append task.")); + return; + } + // 省略部分代码... + this.logManager.appendEntries(entries, new LeaderStableClosure(entries)); + checkAndSetConfiguration(false); + } +``` + +##### QuorumFactory + +在上面的**executeApplyingTasks与unsafeApplyConfiguration**方法中使用到了QuorumFactory这个工厂类的方法。为了更方便配置一个Quorum的属性,可以将factor因子和NWR开关整合到QuorumConfiguration类中,以便于快速构建一个QuorumConfiguration。实现代码如下: + +``` +public final class QuorumFactory { + public static QuorumConfiguration createNWRQuorumConfiguration(Integer writeFactor,Integer readFactor) { + boolean isEnableNWR = true; + QuorumConfiguration quorumConfiguration = new QuorumConfiguration(); + quorumConfiguration.setReadFactor(readFactor); + quorumConfiguration.setWriteFactor(writeFactor); + quorumConfiguration.setEnableNWR(isEnableNWR); + return quorumConfiguration; + } + + public static QuorumConfiguration createMajorityQuorumConfiguration(){ + boolean isEnableNWR = false; + QuorumConfiguration quorumConfiguration = new QuorumConfiguration(); + quorumConfiguration.setEnableNWR(isEnableNWR); + return quorumConfiguration; + } +} +``` + +对于BallotBox#CommitAt来说,在进行确认时,只需要从pendingMetaQueue获取Quorum再进行grant授权投票即可,之后再判断是否已经达到(多数派/NWR)确认。 + +``` + public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final PeerId peer) { + // 省略部分代码... + Quorum.PosHint hint = new Quorum.PosHint(); + for (long logIndex = startAt; logIndex <= lastLogIndex; logIndex++) { + final Quorum quorum = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex)); + hint = quorum.grant(peer, hint); + if (quorum.isGranted()) { + lastCommittedIndex = logIndex; + } + } + // 省略部分代码... + this.waiter.onCommitted(lastCommittedIndex); + return true; + } +``` + +#### Consistent-reading Module + +对于**ReadIndexHeartbeatResponseClosure**类来讲,他的run方法执行了心跳消息多数派确认逻辑。其构造器内传入的quorum值需要进行NWR模型适配,并且failPeersThreshold属性也需要重新适配计算逻辑。 +原有获取ReadQuorum数值的多数派确认逻辑是: + +``` + private int getQuorum() { + final Configuration c = this.conf.getConf(); + if (c.isEmpty()) { + return 0; + } + return c.getPeers().size() / 2 + 1; + } +``` + +如今我们需要修改该方法,额外对NWR模型进行判断: + +``` + private int getReadQuorum() { + final Configuration c = this.conf.getConf(); + if (c.isEmpty()) { + return 0; + } + int size = c.getPeers().size(); + if(!options.isEnableFlexibleRaft()){ + return size / 2 + 1; + } + int writeQuorum = new BigDecimal("0.1").multiply(new BigDecimal(options.getWriteQuorumFactor())) + .multiply(new BigDecimal(c.getPeers().size())).setScale(0, RoundingMode.CEILING).intValue(); + return size - writeQuorum + 1; + } +``` + +failPeersThreshold原有计算逻辑: + +``` +this.failPeersThreshold = peersCount % 2 == 0 ? (quorum - 1) : quorum; +``` + +修改后: + +``` +this.failPeersThreshold = options.isEnableFlexibleRaft() ? peersCount - quorum + 1 : + (peersCount % 2 == 0 ? (quorum - 1) : quorum); +``` +#### Member change + +##### addPeer + +新增成员的逻辑是:基于原conf生成一份新conf,并添加指定peer节点,然后调用unsafeRegisterConfChange方法执行成员配置变更,并且为当前成员增加Replicator对象进行日志复制。对于新增成员来说,需要给他一段学习时间,赶上leader的日志进度,然后才加入集群。 + +##### removePeer + +移除成员的逻辑是:基于原conf生成一份新conf,这份新配置移除了指定的peer,然后调用unsafeRegisterConfChange方法执行成员配置变更,对于移除成员,直接调用nextStage()进入下一阶段记录成员变更日志项。 + +##### changePeers + +与前两者方法类似。不过对于处理多成员变更,也是需要进行逐个成员变更的,也就是所谓的单节点变更。 + +##### resetPeers + +这个方法用于强制变更本节点的配置,单独重置该节点的配置,而在该节点成为领导者之前,无需复制其他同行。 当复制组的大多数已死时,应该调用此功能。在这种情况下,一致性和共识都不能保证,在处理此方法时要小心。 + +##### stepDown +另外,stepDownTimer计时器会处理那些下线的节点。当一个集群中,下线节点数量超过多数派数量时,将会导致整个集群不可用,在**NodeImpl#checkDeadNodes0**方法中,会校验已经死亡的节点,其中涉及到的多数派模型代码如下: + + +``` + if (aliveCount >= peers.size() / 2 + 1) { + updateLastLeaderTimestamp(startLease); + return true; + } +``` + +由于加入NWR模型,我们需要修改为 + +``` + if (aliveCount >= getReadQuorum()) { + updateLastLeaderTimestamp(startLease); + return true; + } +``` + +另外对于checkDeadNodes方法来讲,如果当下线节点数量不再满足MajorityQuorum或者ReadQuorum时,将会报错并且将leader节点stepDown。 + +``` + if (stepDownOnCheckFail) { + LOG.warn("Node {} steps down when alive nodes don't satisfy quorum, term={}, deadNodes={}, conf={}.", + getNodeId(), this.currTerm, deadNodes, conf); + final Status status = new Status(); + String msg = options.isEnableFlexibleRaft() ? "Reading quorum does not meet availability conditions: " + + getReadQuorum() + ", Some nodes in the cluster dies" : + "Majority of the group dies"; + status.setError(RaftError.ERAFTTIMEDOUT, "%s: %d/%d", msg, + deadNodes.size(), peers.size()); + stepDown(this.currTerm, false, status); + } +``` + +1 \ No newline at end of file