From 7797e91cd31a6a6497bbc56508a16ef0135aab04 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Tue, 12 Mar 2019 16:54:35 +0800 Subject: [PATCH 1/9] (fix) fix stale read with lease read --- .../com/alipay/sofa/jraft/core/NodeImpl.java | 29 +++++++++++++++++-- .../alipay/sofa/jraft/option/NodeOptions.java | 20 +++++++++++++ 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index b42b802fb..045fd3446 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -67,6 +67,7 @@ import com.alipay.sofa.jraft.option.LogManagerOptions; import com.alipay.sofa.jraft.option.NodeOptions; import com.alipay.sofa.jraft.option.RaftOptions; +import com.alipay.sofa.jraft.option.ReadOnlyOption; import com.alipay.sofa.jraft.option.ReadOnlyServiceOptions; import com.alipay.sofa.jraft.option.ReplicatorGroupOptions; import com.alipay.sofa.jraft.option.SnapshotExecutorOptions; @@ -1234,11 +1235,18 @@ private void readLeader(ReadIndexRequest request, final ReadIndexResponse.Builde } } - switch (this.raftOptions.getReadOnlyOptions()) { + ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions(); + if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased + && !checkLeaderLease(this.conf.getConf(), Utils.nowMs())) { + // If leader lease timeout, we must change option to ReadOnlySafe + readOnlyOpt = ReadOnlyOption.ReadOnlySafe; + } + + switch (readOnlyOpt) { case ReadOnlySafe: - final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure( - closure, respBuilder, quorum, this.conf.getConf().getPeers().size()); final List peers = this.conf.getConf().getPeers(); + final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure( + closure, respBuilder, quorum, peers.size()); Requires.requireNonNull(peers, "Peer is null"); Requires.requireTrue(!peers.isEmpty(), "Peer is empty"); @@ -1266,6 +1274,21 @@ private void readLeader(ReadIndexRequest request, final ReadIndexResponse.Builde } } + private boolean checkLeaderLease(final Configuration conf, final long nowMs) { + final List peers = conf.listPeers(); + int aliveCount = 0; + for (final PeerId peer : peers) { + if (peer.equals(this.serverId)) { + aliveCount++; + continue; + } + if (nowMs - this.replicatorGroup.getLastRpcSendTimestamp(peer) <= this.options.getLeaderLeaseTimeoutMs()) { + aliveCount++; + } + } + return aliveCount >= peers.size() / 2 + 1; + } + @Override public void apply(final Task task) { if (this.shutdownLatch != null) { diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java index 816930012..618e0df6b 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java @@ -36,6 +36,10 @@ public class NodeOptions extends RpcOptions { // Default: 1000 (1s) private int electionTimeoutMs = 1000; // follower to candidate timeout + // Leader lease time's ratio of electionTimeoutMs + // Default: 90, Max: 100 + private int leaderLeaseTimeRatio = 90; + // A snapshot saving would be triggered every |snapshot_interval_s| seconds // if this was reset as a positive number // If |snapshot_interval_s| <= 0, the time based snapshot would be disabled. @@ -182,6 +186,22 @@ public void setElectionTimeoutMs(int electionTimeoutMs) { this.electionTimeoutMs = electionTimeoutMs; } + public int getLeaderLeaseTimeRatio() { + return leaderLeaseTimeRatio; + } + + public void setLeaderLeaseTimeRatio(int leaderLeaseTimeRatio) { + if (leaderLeaseTimeRatio <= 0 || leaderLeaseTimeRatio >= 100) { + throw new IllegalArgumentException("leaderLeaseTimeRatio: " + leaderLeaseTimeRatio + + " (expected: 0 < leaderLeaseTimeRatio < 100)"); + } + this.leaderLeaseTimeRatio = leaderLeaseTimeRatio; + } + + public int getLeaderLeaseTimeoutMs() { + return getElectionTimeoutMs() * getLeaderLeaseTimeRatio() / 100; + } + public int getSnapshotIntervalSecs() { return this.snapshotIntervalSecs; } From 0763c889c4b7666d060ed565cbe74bd989b9e492 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Tue, 12 Mar 2019 17:23:11 +0800 Subject: [PATCH 2/9] (fix) use monotonic clock on rpc_send_time --- .../alipay/sofa/jraft/ReplicatorGroup.java | 2 +- .../com/alipay/sofa/jraft/core/NodeImpl.java | 20 +++++++++------- .../alipay/sofa/jraft/core/Replicator.java | 24 +++++++++---------- .../sofa/jraft/core/ReplicatorTest.java | 21 ++++++++-------- 4 files changed, 35 insertions(+), 32 deletions(-) diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/ReplicatorGroup.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/ReplicatorGroup.java index 916bc7466..ecc74a117 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/ReplicatorGroup.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/ReplicatorGroup.java @@ -93,7 +93,7 @@ public interface ReplicatorGroup { boolean waitCaughtUp(PeerId peer, long maxMargin, long dueTime, CatchUpClosure done); /** - * Get peer's last rpc send timestamp. + * Get peer's last rpc send timestamp (monotonic time in milliseconds). * * @param peer the peer of replicator */ diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index 045fd3446..2c6a09dd2 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -1237,7 +1237,7 @@ private void readLeader(ReadIndexRequest request, final ReadIndexResponse.Builde ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions(); if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased - && !checkLeaderLease(this.conf.getConf(), Utils.nowMs())) { + && !checkLeaderLease(this.conf.getConf(), Utils.monotonicMs())) { // If leader lease timeout, we must change option to ReadOnlySafe readOnlyOpt = ReadOnlyOption.ReadOnlySafe; } @@ -1274,7 +1274,7 @@ private void readLeader(ReadIndexRequest request, final ReadIndexResponse.Builde } } - private boolean checkLeaderLease(final Configuration conf, final long nowMs) { + private boolean checkLeaderLease(final Configuration conf, final long monotonicNowMs) { final List peers = conf.listPeers(); int aliveCount = 0; for (final PeerId peer : peers) { @@ -1282,7 +1282,8 @@ private boolean checkLeaderLease(final Configuration conf, final long nowMs) { aliveCount++; continue; } - if (nowMs - this.replicatorGroup.getLastRpcSendTimestamp(peer) <= this.options.getLeaderLeaseTimeoutMs()) { + if (monotonicNowMs - this.replicatorGroup.getLastRpcSendTimestamp(peer) <= this.options + .getLeaderLeaseTimeoutMs()) { aliveCount++; } } @@ -1740,7 +1741,8 @@ private void onCaughtUp(PeerId peer, long term, long version, Status st) { } // Retry if this peer is still alive if (st.getCode() == RaftError.ETIMEDOUT.getNumber() - && Utils.nowMs() - replicatorGroup.getLastRpcSendTimestamp(peer) <= options.getElectionTimeoutMs()) { + && Utils.monotonicMs() - replicatorGroup.getLastRpcSendTimestamp(peer) <= options + .getElectionTimeoutMs()) { LOG.debug("Node {} waits peer {} to catch up", getNodeId(), peer); final OnCaughtUp caughtUp = new OnCaughtUp(this, term, peer, version); final long dueTime = Utils.nowMs() + options.getElectionTimeoutMs(); @@ -1757,7 +1759,7 @@ private void onCaughtUp(PeerId peer, long term, long version, Status st) { } } - private void checkDeadNodes(Configuration conf, long nowMs) { + private void checkDeadNodes(Configuration conf, long monotonicNowMs) { final List peers = conf.listPeers(); int aliveCount = 0; final Configuration deadNodes = new Configuration(); @@ -1767,7 +1769,7 @@ private void checkDeadNodes(Configuration conf, long nowMs) { continue; } this.checkReplicator(peer); - if (nowMs - replicatorGroup.getLastRpcSendTimestamp(peer) <= options.getElectionTimeoutMs()) { + if (monotonicNowMs - replicatorGroup.getLastRpcSendTimestamp(peer) <= options.getElectionTimeoutMs()) { aliveCount++; continue; } @@ -1792,10 +1794,10 @@ private void handleStepDownTimeout() { LOG.debug("Node {} term {} stop stepdown timer state is {}", getNodeId(), this.currTerm, this.state); return; } - final long now = Utils.nowMs(); - checkDeadNodes(this.conf.getConf(), now); + final long monotonicNowMs = Utils.monotonicMs(); + checkDeadNodes(this.conf.getConf(), monotonicNowMs); if (!conf.getOldConf().isEmpty()) { - checkDeadNodes(conf.getOldConf(), now); + checkDeadNodes(conf.getOldConf(), monotonicNowMs); } } finally { writeLock.unlock(); diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java index 75be9a0f5..6d8ade989 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java @@ -498,7 +498,7 @@ void installSnapshot() { this.state = State.Snapshot; // noinspection NonAtomicOperationOnVolatileField this.installSnapshotCounter++; - final long sendTime = Utils.nowMs(); + final long monotonicSendTimeMs = Utils.monotonicMs(); final int stateVersion = this.version; final int seq = getAndIncrementReqSeq(); final Future rpcFuture = rpcService.installSnapshot(this.options.getPeerId().getEndpoint(), @@ -507,7 +507,7 @@ void installSnapshot() { @Override public void run(Status status) { onRpcReturned(id, RequestType.Snapshot, status, request, getResponse(), seq, stateVersion, - sendTime); + monotonicSendTimeMs); } }); addInflight(RequestType.Snapshot, this.nextIndex, 0, 0, seq, rpcFuture); @@ -595,7 +595,7 @@ private void sendEmptyEntries(final boolean isHeartbeat, return; } try { - final long sendTime = Utils.nowMs(); + final long monotonicSendTimeMs = Utils.monotonicMs(); final AppendEntriesRequest request = rb.build(); if (isHeartbeat) { @@ -610,7 +610,7 @@ private void sendEmptyEntries(final boolean isHeartbeat, @Override public void run(Status status) { - onHeartbeatReturned(id, status, request, getResponse(), sendTime); + onHeartbeatReturned(id, status, request, getResponse(), monotonicSendTimeMs); } }; } @@ -631,7 +631,7 @@ public void run(Status status) { @Override public void run(Status status) { onRpcReturned(id, RequestType.AppendEntries, status, request, getResponse(), seq, - stateVersion, sendTime); + stateVersion, monotonicSendTimeMs); } }); @@ -709,9 +709,8 @@ public static ThreadId start(ReplicatorOptions opts, RaftOptions raftOptions) { r.id.lock(); LOG.info("Replicator={}@{} is started", r.id, r.options.getPeerId()); r.catchUpClosure = null; - final long now = Utils.nowMs(); - r.lastRpcSendTimestamp = now; - r.startHeartbeatTimer(now); + r.lastRpcSendTimestamp = Utils.monotonicMs(); + r.startHeartbeatTimer(Utils.nowMs()); //id.unlock in sendEmptyEntries r.sendEmptyEntries(false); return r.id; @@ -985,7 +984,7 @@ static void onHeartbeatReturned(ThreadId id, Status status, AppendEntriesRequest r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true); r.destroy(); node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE, - "Leader receives higher term hearbeat_response from peer:%s", r.options.getPeerId())); + "Leader receives higher term heartbeat_response from peer:%s", r.options.getPeerId())); return; } if (isLogDebugEnabled) { @@ -1145,7 +1144,7 @@ private static boolean onAppendEntriesReturned(ThreadId id, Inflight inflight, S } //record metrics if (request.getEntriesCount() > 0) { - r.nodeMetrics.recordLatency("replicate-entries", Utils.nowMs() - rpcSendTime); + r.nodeMetrics.recordLatency("replicate-entries", Utils.monotonicMs() - rpcSendTime); r.nodeMetrics.recordSize("replicate-entries-count", request.getEntriesCount()); r.nodeMetrics.recordSize("replicate-entries-bytes", request.getData() != null ? request.getData().size() : 0); @@ -1381,14 +1380,15 @@ private boolean sendEntries(final long nextSendingIndex) { statInfo.lastLogIndex = rb.getPrevLogIndex() + rb.getEntriesCount(); final int v = this.version; - final long sendTimeMs = Utils.nowMs(); + final long monotonicSendTimeMs = Utils.monotonicMs(); final int seq = getAndIncrementReqSeq(); final Future rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(), request, -1, new RpcResponseClosureAdapter() { @Override public void run(Status status) { - onRpcReturned(id, RequestType.AppendEntries, status, request, getResponse(), seq, v, sendTimeMs); + onRpcReturned(id, RequestType.AppendEntries, status, request, getResponse(), seq, v, + monotonicSendTimeMs); } }); diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorTest.java index 165fa75d5..b38df3738 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReplicatorTest.java @@ -51,6 +51,7 @@ import com.alipay.sofa.jraft.storage.SnapshotStorage; import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.util.ThreadId; +import com.alipay.sofa.jraft.util.Utils; import com.google.protobuf.ByteString; import com.google.protobuf.Message; @@ -164,7 +165,7 @@ public void testOnRpcReturnedRpcError() { id.unlock(); Replicator.onRpcReturned(this.id, Replicator.RequestType.AppendEntries, new Status(-1, "test error"), request, - response, 0, 0, System.currentTimeMillis()); + response, 0, 0, Utils.monotonicMs()); assertEquals(r.statInfo.runningState, Replicator.RunningState.BLOCKING); assertNotNull(r.getBlockTimer()); } @@ -179,7 +180,7 @@ public void testOnRpcReturnedTermMismatch() { id.unlock(); Replicator.onRpcReturned(this.id, Replicator.RequestType.AppendEntries, Status.OK(), request, response, 0, 0, - System.currentTimeMillis()); + Utils.monotonicMs()); Mockito.verify(this.node).increaseTermTo( 2, new Status(RaftError.EHIGHERTERMRESPONSE, "Leader receives higher term hearbeat_response from peer:%s", @@ -212,7 +213,7 @@ public void testOnRpcReturnedMoreLogs() { .thenReturn(new FutureImpl<>()); Replicator.onRpcReturned(this.id, Replicator.RequestType.AppendEntries, Status.OK(), request, response, 0, 0, - System.currentTimeMillis()); + Utils.monotonicMs()); assertNotNull(r.getRpcInFly()); assertNotSame(r.getRpcInFly(), rpcInFly); @@ -247,7 +248,7 @@ public void testOnRpcReturnedLessLogs() { .thenReturn(new FutureImpl<>()); Replicator.onRpcReturned(this.id, Replicator.RequestType.AppendEntries, Status.OK(), request, response, 0, 0, - System.currentTimeMillis()); + Utils.monotonicMs()); assertNotNull(r.getRpcInFly()); assertNotSame(r.getRpcInFly(), rpcInFly); @@ -280,7 +281,7 @@ public void run(Status status) { }); Replicator.onRpcReturned(this.id, Replicator.RequestType.AppendEntries, Status.OK(), request, response, 0, 0, - System.currentTimeMillis()); + Utils.monotonicMs()); assertEquals(r.statInfo.runningState, Replicator.RunningState.IDLE); id.unlock(); @@ -395,7 +396,7 @@ public void testOnHeartbeatReturnedRpcError() { final ScheduledFuture timer = r.getHeartbeatTimer(); assertNotNull(timer); Replicator.onHeartbeatReturned(id, new Status(-1, "test"), this.createEmptyEntriesRequestt(), null, - System.currentTimeMillis()); + Utils.monotonicMs()); assertNotNull(r.getHeartbeatTimer()); assertNotSame(timer, r.getHeartbeatTimer()); } @@ -410,7 +411,7 @@ public void testOnHeartbeatReturnedOK() { setSuccess(false). // setLastLogIndex(10).setTerm(1).build(); Replicator.onHeartbeatReturned(id, Status.OK(), this.createEmptyEntriesRequestt(), response, - System.currentTimeMillis()); + Utils.monotonicMs()); assertNotNull(r.getHeartbeatTimer()); assertNotSame(timer, r.getHeartbeatTimer()); } @@ -424,7 +425,7 @@ public void testOnHeartbeatReturnedTermMismatch() { setLastLogIndex(12).setTerm(2).build(); id.unlock(); - Replicator.onHeartbeatReturned(this.id, Status.OK(), request, response, System.currentTimeMillis()); + Replicator.onHeartbeatReturned(this.id, Status.OK(), request, response, Utils.monotonicMs()); Mockito.verify(this.node).increaseTermTo( 2, new Status(RaftError.EHIGHERTERMRESPONSE, "Leader receives higher term hearbeat_response from peer:%s", @@ -665,10 +666,10 @@ public void testOnRpcReturnedOutOfOrder() { assertTrue(r.getPendingResponses().isEmpty()); Replicator.onRpcReturned(this.id, Replicator.RequestType.AppendEntries, Status.OK(), request, response, 1, 0, - System.currentTimeMillis()); + Utils.monotonicMs()); assertEquals(1, r.getPendingResponses().size()); Replicator.onRpcReturned(this.id, Replicator.RequestType.AppendEntries, Status.OK(), request, response, 0, 0, - System.currentTimeMillis()); + Utils.monotonicMs()); assertTrue(r.getPendingResponses().isEmpty()); assertEquals(0, r.getWaitId()); assertEquals(11, r.getRealNextIndex()); From ffaf2e26b2ad06f17114333f88a9d5690e7e1158 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Tue, 12 Mar 2019 18:03:59 +0800 Subject: [PATCH 3/9] (fix) avoid unnecessary copy of peers --- .../src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index 2c6a09dd2..cfe8acd51 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -1275,7 +1275,7 @@ private void readLeader(ReadIndexRequest request, final ReadIndexResponse.Builde } private boolean checkLeaderLease(final Configuration conf, final long monotonicNowMs) { - final List peers = conf.listPeers(); + final List peers = conf.getPeers(); int aliveCount = 0; for (final PeerId peer : peers) { if (peer.equals(this.serverId)) { From 5a91617718ecf34da0af38dd93cdd87990373067 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Tue, 12 Mar 2019 19:19:56 +0800 Subject: [PATCH 4/9] (fix) better perf for lease read --- .../com/alipay/sofa/jraft/core/NodeImpl.java | 54 +++++++++---------- .../alipay/sofa/jraft/option/NodeOptions.java | 10 ++-- 2 files changed, 30 insertions(+), 34 deletions(-) diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index cfe8acd51..34f4f7276 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -412,7 +412,7 @@ public NodeImpl(String groupId, PeerId serverId) { this.serverId = serverId != null ? serverId.copy() : null; this.state = State.STATE_UNINITIALIZED; this.currTerm = 0; - this.updateLastLeaderTimestamp(); + this.updateLastLeaderTimestamp(Utils.monotonicMs()); this.confCtx = new ConfigurationCtx(this); this.wakingCandidate = null; GLOBAL_NUM_NODES.incrementAndGet(); @@ -484,7 +484,7 @@ private void handleElectionTimeout() { if (this.state != State.STATE_FOLLOWER) { return; } - if (isLeaderLeaseValid()) { + if (isElectionTimeout()) { return; } final PeerId emptyId = new PeerId(); @@ -686,7 +686,11 @@ protected int adjustTimeout(int timeoutMs) { return randomTimeout(timeoutMs); } }; - this.stepDownTimer = new RepeatedTimer("JRaft-StepDownTimer", this.options.getElectionTimeoutMs() >> 1) { + // To minimize the effects of clock drift, we should make sure that: + // stepDownTimeoutMs + leaderLeaseTimeoutMs < electionTimeout + final int stepDownTimeoutMs = (this.options.getElectionTimeoutMs() * Math.max( + 100 - this.options.getLeaderLeaseTimeRatio(), 0)) >> 1; + this.stepDownTimer = new RepeatedTimer("JRaft-StepDownTimer", stepDownTimeoutMs) { @Override protected void onTrigger() { @@ -976,7 +980,7 @@ private void stepDown(long term, boolean wakeupCandidate, Status status) { // soft state in memory this.state = State.STATE_FOLLOWER; this.confCtx.reset(); - this.updateLastLeaderTimestamp(); + this.updateLastLeaderTimestamp(Utils.monotonicMs()); if (this.snapshotExecutor != null) { snapshotExecutor.interruptDownloadingSnapshots(term); } @@ -1236,8 +1240,7 @@ private void readLeader(ReadIndexRequest request, final ReadIndexResponse.Builde } ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions(); - if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased - && !checkLeaderLease(this.conf.getConf(), Utils.monotonicMs())) { + if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && !isLeaderLeaseValid()) { // If leader lease timeout, we must change option to ReadOnlySafe readOnlyOpt = ReadOnlyOption.ReadOnlySafe; } @@ -1274,22 +1277,6 @@ private void readLeader(ReadIndexRequest request, final ReadIndexResponse.Builde } } - private boolean checkLeaderLease(final Configuration conf, final long monotonicNowMs) { - final List peers = conf.getPeers(); - int aliveCount = 0; - for (final PeerId peer : peers) { - if (peer.equals(this.serverId)) { - aliveCount++; - continue; - } - if (monotonicNowMs - this.replicatorGroup.getLastRpcSendTimestamp(peer) <= this.options - .getLeaderLeaseTimeoutMs()) { - aliveCount++; - } - } - return aliveCount >= peers.size() / 2 + 1; - } - @Override public void apply(final Task task) { if (this.shutdownLatch != null) { @@ -1333,7 +1320,7 @@ public Message handlePreVoteRequest(RequestVoteRequest request) { boolean granted = false; // noinspection ConstantConditions do { - if (this.leaderId != null && !this.leaderId.isEmpty() && isLeaderLeaseValid()) { + if (this.leaderId != null && !this.leaderId.isEmpty() && isElectionTimeout()) { LOG.info( "Node {} ignore PreVote from {} in term {} currTerm {}, because the leader {}'s lease is still valid.", this.getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId); @@ -1378,11 +1365,15 @@ public Message handlePreVoteRequest(RequestVoteRequest request) { } private boolean isLeaderLeaseValid() { + return Utils.monotonicMs() - this.lastLeaderTimestamp < this.options.getLeaderLeaseTimeoutMs(); + } + + private boolean isElectionTimeout() { return Utils.monotonicMs() - this.lastLeaderTimestamp < this.options.getElectionTimeoutMs(); } - private void updateLastLeaderTimestamp() { - this.lastLeaderTimestamp = Utils.monotonicMs(); + private void updateLastLeaderTimestamp(long lastLeaderTimestamp) { + this.lastLeaderTimestamp = lastLeaderTimestamp; } private void checkReplicator(PeerId candidateId) { @@ -1577,7 +1568,7 @@ public Message handleAppendEntriesRequest(AppendEntriesRequest request, RpcReque return responseBuilder.build(); } - this.updateLastLeaderTimestamp(); + this.updateLastLeaderTimestamp(Utils.monotonicMs()); if (entriesCount > 0 && this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) { LOG.warn("Node {} received AppendEntriesRequest while installing snapshot", getNodeId()); @@ -1763,21 +1754,26 @@ private void checkDeadNodes(Configuration conf, long monotonicNowMs) { final List peers = conf.listPeers(); int aliveCount = 0; final Configuration deadNodes = new Configuration(); + long minLastRpcSendTimestamp = Integer.MAX_VALUE; for (final PeerId peer : peers) { if (peer.equals(this.serverId)) { aliveCount++; continue; } - this.checkReplicator(peer); - if (monotonicNowMs - replicatorGroup.getLastRpcSendTimestamp(peer) <= options.getElectionTimeoutMs()) { + checkReplicator(peer); + final long lastRpcSendTimestamp = replicatorGroup.getLastRpcSendTimestamp(peer); + if (monotonicNowMs - lastRpcSendTimestamp <= options.getLeaderLeaseTimeoutMs()) { aliveCount++; + if (minLastRpcSendTimestamp > lastRpcSendTimestamp) { + minLastRpcSendTimestamp = lastRpcSendTimestamp; + } continue; } deadNodes.addPeer(peer); } if (aliveCount >= peers.size() / 2 + 1) { - this.updateLastLeaderTimestamp(); + this.updateLastLeaderTimestamp(minLastRpcSendTimestamp); return; } LOG.warn("Node {} term {} steps down when alive nodes don't satisfy quorum dead nodes: {} conf: {}", diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java index 618e0df6b..000af7714 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java @@ -37,8 +37,8 @@ public class NodeOptions extends RpcOptions { private int electionTimeoutMs = 1000; // follower to candidate timeout // Leader lease time's ratio of electionTimeoutMs - // Default: 90, Max: 100 - private int leaderLeaseTimeRatio = 90; + // Default: 80, Max: 80 + private int leaderLeaseTimeRatio = 80; // A snapshot saving would be triggered every |snapshot_interval_s| seconds // if this was reset as a positive number @@ -191,15 +191,15 @@ public int getLeaderLeaseTimeRatio() { } public void setLeaderLeaseTimeRatio(int leaderLeaseTimeRatio) { - if (leaderLeaseTimeRatio <= 0 || leaderLeaseTimeRatio >= 100) { + if (leaderLeaseTimeRatio <= 0 || leaderLeaseTimeRatio > 80) { throw new IllegalArgumentException("leaderLeaseTimeRatio: " + leaderLeaseTimeRatio - + " (expected: 0 < leaderLeaseTimeRatio < 100)"); + + " (expected: 0 < leaderLeaseTimeRatio <= 80)"); } this.leaderLeaseTimeRatio = leaderLeaseTimeRatio; } public int getLeaderLeaseTimeoutMs() { - return getElectionTimeoutMs() * getLeaderLeaseTimeRatio() / 100; + return this.electionTimeoutMs * this.leaderLeaseTimeRatio / 100; } public int getSnapshotIntervalSecs() { From d625f19007e59b57c8665cbebaf131ef21a45d60 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Tue, 12 Mar 2019 20:29:50 +0800 Subject: [PATCH 5/9] (fix) rename method name --- .../main/java/com/alipay/sofa/jraft/core/NodeImpl.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index 34f4f7276..1c789754c 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -484,7 +484,7 @@ private void handleElectionTimeout() { if (this.state != State.STATE_FOLLOWER) { return; } - if (isElectionTimeout()) { + if (isCurrentLeaderValid()) { return; } final PeerId emptyId = new PeerId(); @@ -688,8 +688,8 @@ protected int adjustTimeout(int timeoutMs) { }; // To minimize the effects of clock drift, we should make sure that: // stepDownTimeoutMs + leaderLeaseTimeoutMs < electionTimeout - final int stepDownTimeoutMs = (this.options.getElectionTimeoutMs() * Math.max( - 100 - this.options.getLeaderLeaseTimeRatio(), 0)) >> 1; + final int stepDownTimeoutMs = (this.options.getElectionTimeoutMs() + * Math.max(100 - this.options.getLeaderLeaseTimeRatio(), 0) / 100) >> 1; this.stepDownTimer = new RepeatedTimer("JRaft-StepDownTimer", stepDownTimeoutMs) { @Override @@ -1320,7 +1320,7 @@ public Message handlePreVoteRequest(RequestVoteRequest request) { boolean granted = false; // noinspection ConstantConditions do { - if (this.leaderId != null && !this.leaderId.isEmpty() && isElectionTimeout()) { + if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid()) { LOG.info( "Node {} ignore PreVote from {} in term {} currTerm {}, because the leader {}'s lease is still valid.", this.getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId); @@ -1368,7 +1368,7 @@ private boolean isLeaderLeaseValid() { return Utils.monotonicMs() - this.lastLeaderTimestamp < this.options.getLeaderLeaseTimeoutMs(); } - private boolean isElectionTimeout() { + private boolean isCurrentLeaderValid() { return Utils.monotonicMs() - this.lastLeaderTimestamp < this.options.getElectionTimeoutMs(); } From 5e4716423cc640551ac59946866789740166efd6 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Tue, 12 Mar 2019 20:31:06 +0800 Subject: [PATCH 6/9] (fix) Integer -> Long --- .../src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index 1c789754c..9158da1f7 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -1754,7 +1754,7 @@ private void checkDeadNodes(Configuration conf, long monotonicNowMs) { final List peers = conf.listPeers(); int aliveCount = 0; final Configuration deadNodes = new Configuration(); - long minLastRpcSendTimestamp = Integer.MAX_VALUE; + long minLastRpcSendTimestamp = Long.MAX_VALUE; for (final PeerId peer : peers) { if (peer.equals(this.serverId)) { aliveCount++; From 5c14ea3f0a99323c425b9034dc6a424b91c4a298 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Wed, 13 Mar 2019 10:45:49 +0800 Subject: [PATCH 7/9] (fix) check leader lease in readLeader() --- .../com/alipay/sofa/jraft/core/NodeImpl.java | 50 ++++++++++++++----- .../alipay/sofa/jraft/option/NodeOptions.java | 12 +++-- 2 files changed, 44 insertions(+), 18 deletions(-) diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index 9158da1f7..b5c673972 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -126,7 +126,7 @@ public class NodeImpl implements Node, RaftServerService { private volatile State state; private volatile CountDownLatch shutdownLatch; private long currTerm; - private long lastLeaderTimestamp; + private volatile long lastLeaderTimestamp; private PeerId leaderId = new PeerId(); private PeerId votedId; private final Ballot voteCtx = new Ballot(); @@ -686,11 +686,7 @@ protected int adjustTimeout(int timeoutMs) { return randomTimeout(timeoutMs); } }; - // To minimize the effects of clock drift, we should make sure that: - // stepDownTimeoutMs + leaderLeaseTimeoutMs < electionTimeout - final int stepDownTimeoutMs = (this.options.getElectionTimeoutMs() - * Math.max(100 - this.options.getLeaderLeaseTimeRatio(), 0) / 100) >> 1; - this.stepDownTimer = new RepeatedTimer("JRaft-StepDownTimer", stepDownTimeoutMs) { + this.stepDownTimer = new RepeatedTimer("JRaft-StepDownTimer", this.options.getElectionTimeoutMs() >> 1) { @Override protected void onTrigger() { @@ -1364,8 +1360,36 @@ public Message handlePreVoteRequest(RequestVoteRequest request) { } } + // in read_lock private boolean isLeaderLeaseValid() { - return Utils.monotonicMs() - this.lastLeaderTimestamp < this.options.getLeaderLeaseTimeoutMs(); + final long monotonicNowMs = Utils.monotonicMs(); + final int leaderLeaseTimeoutMs = this.options.getLeaderLeaseTimeoutMs(); + final boolean isLeaseValid = monotonicNowMs - this.lastLeaderTimestamp < leaderLeaseTimeoutMs; + if (isLeaseValid) { + return true; + } + + final List peers = this.conf.getConf().getPeers(); + int aliveCount = 0; + long startLease = Long.MAX_VALUE; + for (final PeerId peer : peers) { + if (peer.equals(this.serverId)) { + aliveCount++; + continue; + } + final long lastRpcSendTimestamp = this.replicatorGroup.getLastRpcSendTimestamp(peer); + if (monotonicNowMs - lastRpcSendTimestamp <= leaderLeaseTimeoutMs) { + aliveCount++; + if (startLease > lastRpcSendTimestamp) { + startLease = lastRpcSendTimestamp; + } + } + } + if (aliveCount >= peers.size() / 2 + 1) { + this.updateLastLeaderTimestamp(startLease); + } + + return monotonicNowMs - this.lastLeaderTimestamp < leaderLeaseTimeoutMs; } private boolean isCurrentLeaderValid() { @@ -1754,18 +1778,18 @@ private void checkDeadNodes(Configuration conf, long monotonicNowMs) { final List peers = conf.listPeers(); int aliveCount = 0; final Configuration deadNodes = new Configuration(); - long minLastRpcSendTimestamp = Long.MAX_VALUE; + long startLease = Long.MAX_VALUE; for (final PeerId peer : peers) { if (peer.equals(this.serverId)) { aliveCount++; continue; } checkReplicator(peer); - final long lastRpcSendTimestamp = replicatorGroup.getLastRpcSendTimestamp(peer); - if (monotonicNowMs - lastRpcSendTimestamp <= options.getLeaderLeaseTimeoutMs()) { + final long lastRpcSendTimestamp = this.replicatorGroup.getLastRpcSendTimestamp(peer); + if (monotonicNowMs - lastRpcSendTimestamp <= this.options.getLeaderLeaseTimeoutMs()) { aliveCount++; - if (minLastRpcSendTimestamp > lastRpcSendTimestamp) { - minLastRpcSendTimestamp = lastRpcSendTimestamp; + if (startLease > lastRpcSendTimestamp) { + startLease = lastRpcSendTimestamp; } continue; } @@ -1773,7 +1797,7 @@ private void checkDeadNodes(Configuration conf, long monotonicNowMs) { } if (aliveCount >= peers.size() / 2 + 1) { - this.updateLastLeaderTimestamp(minLastRpcSendTimestamp); + this.updateLastLeaderTimestamp(startLease); return; } LOG.warn("Node {} term {} steps down when alive nodes don't satisfy quorum dead nodes: {} conf: {}", diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java index 000af7714..be9502722 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/NodeOptions.java @@ -36,9 +36,11 @@ public class NodeOptions extends RpcOptions { // Default: 1000 (1s) private int electionTimeoutMs = 1000; // follower to candidate timeout - // Leader lease time's ratio of electionTimeoutMs - // Default: 80, Max: 80 - private int leaderLeaseTimeRatio = 80; + // Leader lease time's ratio of electionTimeoutMs, + // To minimize the effects of clock drift, we should make that: + // clockDrift + leaderLeaseTimeoutMs < electionTimeout + // Default: 90, Max: 100 + private int leaderLeaseTimeRatio = 90; // A snapshot saving would be triggered every |snapshot_interval_s| seconds // if this was reset as a positive number @@ -191,9 +193,9 @@ public int getLeaderLeaseTimeRatio() { } public void setLeaderLeaseTimeRatio(int leaderLeaseTimeRatio) { - if (leaderLeaseTimeRatio <= 0 || leaderLeaseTimeRatio > 80) { + if (leaderLeaseTimeRatio <= 0 || leaderLeaseTimeRatio > 100) { throw new IllegalArgumentException("leaderLeaseTimeRatio: " + leaderLeaseTimeRatio - + " (expected: 0 < leaderLeaseTimeRatio <= 80)"); + + " (expected: 0 < leaderLeaseTimeRatio <= 100)"); } this.leaderLeaseTimeRatio = leaderLeaseTimeRatio; } From b208f674948c9ddac33995e43b0160d9ca21a3ab Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Wed, 13 Mar 2019 10:52:42 +0800 Subject: [PATCH 8/9] (fix) move up the 'peers check' --- .../src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index b5c673972..c5513bf02 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -1244,11 +1244,10 @@ private void readLeader(ReadIndexRequest request, final ReadIndexResponse.Builde switch (readOnlyOpt) { case ReadOnlySafe: final List peers = this.conf.getConf().getPeers(); - final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure( - closure, respBuilder, quorum, peers.size()); - Requires.requireNonNull(peers, "Peer is null"); Requires.requireTrue(!peers.isEmpty(), "Peer is empty"); + final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure( + closure, respBuilder, quorum, peers.size()); // Send heartbeat requests to followers for (final PeerId peer : peers) { if (peer.equals(this.serverId)) { From ee1824de78bb5cf2be89761c580ed03113392d64 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Wed, 13 Mar 2019 15:33:46 +0800 Subject: [PATCH 9/9] (fix) add checkDeadNodes0 --- .../com/alipay/sofa/jraft/core/NodeImpl.java | 64 +++++++++---------- 1 file changed, 29 insertions(+), 35 deletions(-) diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index c5513bf02..157807ae1 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -1362,33 +1362,15 @@ public Message handlePreVoteRequest(RequestVoteRequest request) { // in read_lock private boolean isLeaderLeaseValid() { final long monotonicNowMs = Utils.monotonicMs(); - final int leaderLeaseTimeoutMs = this.options.getLeaderLeaseTimeoutMs(); - final boolean isLeaseValid = monotonicNowMs - this.lastLeaderTimestamp < leaderLeaseTimeoutMs; - if (isLeaseValid) { + if (checkLeaderLease(monotonicNowMs)) { return true; } + checkDeadNodes0(this.conf.getConf().getPeers(), monotonicNowMs, false, null); + return checkLeaderLease(monotonicNowMs); + } - final List peers = this.conf.getConf().getPeers(); - int aliveCount = 0; - long startLease = Long.MAX_VALUE; - for (final PeerId peer : peers) { - if (peer.equals(this.serverId)) { - aliveCount++; - continue; - } - final long lastRpcSendTimestamp = this.replicatorGroup.getLastRpcSendTimestamp(peer); - if (monotonicNowMs - lastRpcSendTimestamp <= leaderLeaseTimeoutMs) { - aliveCount++; - if (startLease > lastRpcSendTimestamp) { - startLease = lastRpcSendTimestamp; - } - } - } - if (aliveCount >= peers.size() / 2 + 1) { - this.updateLastLeaderTimestamp(startLease); - } - - return monotonicNowMs - this.lastLeaderTimestamp < leaderLeaseTimeoutMs; + private boolean checkLeaderLease(long monotonicNowMs) { + return monotonicNowMs - this.lastLeaderTimestamp < this.options.getLeaderLeaseTimeoutMs(); } private boolean isCurrentLeaderValid() { @@ -1775,35 +1757,47 @@ private void onCaughtUp(PeerId peer, long term, long version, Status st) { private void checkDeadNodes(Configuration conf, long monotonicNowMs) { final List peers = conf.listPeers(); - int aliveCount = 0; final Configuration deadNodes = new Configuration(); + if (checkDeadNodes0(peers, monotonicNowMs, true, deadNodes)) { + return; + } + LOG.warn("Node {} term {} steps down when alive nodes don't satisfy quorum dead nodes: {} conf: {}", + getNodeId(), this.currTerm, deadNodes, conf); + final Status status = new Status(); + status.setError(RaftError.ERAFTTIMEDOUT, "Majority of the group dies: %d/%d", deadNodes.size(), peers.size()); + stepDown(this.currTerm, false, status); + } + + private boolean checkDeadNodes0(List peers, long monotonicNowMs, boolean checkReplicator, + Configuration deadNodes) { + final int leaderLeaseTimeoutMs = this.options.getLeaderLeaseTimeoutMs(); + int aliveCount = 0; long startLease = Long.MAX_VALUE; for (final PeerId peer : peers) { if (peer.equals(this.serverId)) { aliveCount++; continue; } - checkReplicator(peer); + if (checkReplicator) { + checkReplicator(peer); + } final long lastRpcSendTimestamp = this.replicatorGroup.getLastRpcSendTimestamp(peer); - if (monotonicNowMs - lastRpcSendTimestamp <= this.options.getLeaderLeaseTimeoutMs()) { + if (monotonicNowMs - lastRpcSendTimestamp <= leaderLeaseTimeoutMs) { aliveCount++; if (startLease > lastRpcSendTimestamp) { startLease = lastRpcSendTimestamp; } continue; } - deadNodes.addPeer(peer); + if (deadNodes != null) { + deadNodes.addPeer(peer); + } } - if (aliveCount >= peers.size() / 2 + 1) { this.updateLastLeaderTimestamp(startLease); - return; + return true; } - LOG.warn("Node {} term {} steps down when alive nodes don't satisfy quorum dead nodes: {} conf: {}", - getNodeId(), this.currTerm, deadNodes, conf); - final Status status = new Status(); - status.setError(RaftError.ERAFTTIMEDOUT, "Majority of the group dies: %d/%d", deadNodes.size(), peers.size()); - stepDown(this.currTerm, false, status); + return false; } private void handleStepDownTimeout() {