Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/fix stale read #34

Merged
merged 9 commits into from
Mar 13, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public interface ReplicatorGroup {
boolean waitCaughtUp(PeerId peer, long maxMargin, long dueTime, CatchUpClosure done);

/**
* Get peer's last rpc send timestamp.
* Get peer's last rpc send timestamp (monotonic time in milliseconds).
*
* @param peer the peer of replicator
*/
Expand Down
83 changes: 64 additions & 19 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import com.alipay.sofa.jraft.option.LogManagerOptions;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.option.ReadOnlyOption;
import com.alipay.sofa.jraft.option.ReadOnlyServiceOptions;
import com.alipay.sofa.jraft.option.ReplicatorGroupOptions;
import com.alipay.sofa.jraft.option.SnapshotExecutorOptions;
Expand Down Expand Up @@ -125,7 +126,7 @@ public class NodeImpl implements Node, RaftServerService {
private volatile State state;
private volatile CountDownLatch shutdownLatch;
private long currTerm;
private long lastLeaderTimestamp;
private volatile long lastLeaderTimestamp;
private PeerId leaderId = new PeerId();
private PeerId votedId;
private final Ballot voteCtx = new Ballot();
Expand Down Expand Up @@ -411,7 +412,7 @@ public NodeImpl(String groupId, PeerId serverId) {
this.serverId = serverId != null ? serverId.copy() : null;
this.state = State.STATE_UNINITIALIZED;
this.currTerm = 0;
this.updateLastLeaderTimestamp();
this.updateLastLeaderTimestamp(Utils.monotonicMs());
this.confCtx = new ConfigurationCtx(this);
this.wakingCandidate = null;
GLOBAL_NUM_NODES.incrementAndGet();
Expand Down Expand Up @@ -483,7 +484,7 @@ private void handleElectionTimeout() {
if (this.state != State.STATE_FOLLOWER) {
return;
}
if (isLeaderLeaseValid()) {
if (isCurrentLeaderValid()) {
return;
}
final PeerId emptyId = new PeerId();
Expand Down Expand Up @@ -975,7 +976,7 @@ private void stepDown(long term, boolean wakeupCandidate, Status status) {
// soft state in memory
this.state = State.STATE_FOLLOWER;
this.confCtx.reset();
this.updateLastLeaderTimestamp();
this.updateLastLeaderTimestamp(Utils.monotonicMs());
if (this.snapshotExecutor != null) {
snapshotExecutor.interruptDownloadingSnapshots(term);
}
Expand Down Expand Up @@ -1234,11 +1235,17 @@ private void readLeader(ReadIndexRequest request, final ReadIndexResponse.Builde
}
}

switch (this.raftOptions.getReadOnlyOptions()) {
ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions();
if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && !isLeaderLeaseValid()) {
// If leader lease timeout, we must change option to ReadOnlySafe
readOnlyOpt = ReadOnlyOption.ReadOnlySafe;
}

switch (readOnlyOpt) {
case ReadOnlySafe:
final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure(
closure, respBuilder, quorum, this.conf.getConf().getPeers().size());
final List<PeerId> peers = this.conf.getConf().getPeers();
final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure(
closure, respBuilder, quorum, peers.size());

Requires.requireNonNull(peers, "Peer is null");
Requires.requireTrue(!peers.isEmpty(), "Peer is empty");
Expand Down Expand Up @@ -1309,7 +1316,7 @@ public Message handlePreVoteRequest(RequestVoteRequest request) {
boolean granted = false;
// noinspection ConstantConditions
do {
if (this.leaderId != null && !this.leaderId.isEmpty() && isLeaderLeaseValid()) {
if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid()) {
LOG.info(
"Node {} ignore PreVote from {} in term {} currTerm {}, because the leader {}'s lease is still valid.",
this.getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId);
Expand Down Expand Up @@ -1353,12 +1360,44 @@ public Message handlePreVoteRequest(RequestVoteRequest request) {
}
}

// in read_lock
private boolean isLeaderLeaseValid() {
final long monotonicNowMs = Utils.monotonicMs();
final int leaderLeaseTimeoutMs = this.options.getLeaderLeaseTimeoutMs();
final boolean isLeaseValid = monotonicNowMs - this.lastLeaderTimestamp < leaderLeaseTimeoutMs;
if (isLeaseValid) {
return true;
}

final List<PeerId> peers = this.conf.getConf().getPeers();
int aliveCount = 0;
long startLease = Long.MAX_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这块逻辑和 checkDeadNodes 中的部分代码太相似了,抽取一个方法吧,传入参数 boolean checkReplicator,true 的情况下做下 check,这样就可以抽取了。

for (final PeerId peer : peers) {
if (peer.equals(this.serverId)) {
aliveCount++;
continue;
}
final long lastRpcSendTimestamp = this.replicatorGroup.getLastRpcSendTimestamp(peer);
if (monotonicNowMs - lastRpcSendTimestamp <= leaderLeaseTimeoutMs) {
aliveCount++;
if (startLease > lastRpcSendTimestamp) {
startLease = lastRpcSendTimestamp;
}
}
}
if (aliveCount >= peers.size() / 2 + 1) {
this.updateLastLeaderTimestamp(startLease);
}

return monotonicNowMs - this.lastLeaderTimestamp < leaderLeaseTimeoutMs;
}

private boolean isCurrentLeaderValid() {
return Utils.monotonicMs() - this.lastLeaderTimestamp < this.options.getElectionTimeoutMs();
}

private void updateLastLeaderTimestamp() {
this.lastLeaderTimestamp = Utils.monotonicMs();
private void updateLastLeaderTimestamp(long lastLeaderTimestamp) {
this.lastLeaderTimestamp = lastLeaderTimestamp;
}

private void checkReplicator(PeerId candidateId) {
Expand Down Expand Up @@ -1553,7 +1592,7 @@ public Message handleAppendEntriesRequest(AppendEntriesRequest request, RpcReque
return responseBuilder.build();
}

this.updateLastLeaderTimestamp();
this.updateLastLeaderTimestamp(Utils.monotonicMs());

if (entriesCount > 0 && this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) {
LOG.warn("Node {} received AppendEntriesRequest while installing snapshot", getNodeId());
Expand Down Expand Up @@ -1717,7 +1756,8 @@ private void onCaughtUp(PeerId peer, long term, long version, Status st) {
}
// Retry if this peer is still alive
if (st.getCode() == RaftError.ETIMEDOUT.getNumber()
&& Utils.nowMs() - replicatorGroup.getLastRpcSendTimestamp(peer) <= options.getElectionTimeoutMs()) {
&& Utils.monotonicMs() - replicatorGroup.getLastRpcSendTimestamp(peer) <= options
.getElectionTimeoutMs()) {
LOG.debug("Node {} waits peer {} to catch up", getNodeId(), peer);
final OnCaughtUp caughtUp = new OnCaughtUp(this, term, peer, version);
final long dueTime = Utils.nowMs() + options.getElectionTimeoutMs();
Expand All @@ -1734,25 +1774,30 @@ private void onCaughtUp(PeerId peer, long term, long version, Status st) {
}
}

private void checkDeadNodes(Configuration conf, long nowMs) {
private void checkDeadNodes(Configuration conf, long monotonicNowMs) {
final List<PeerId> peers = conf.listPeers();
int aliveCount = 0;
final Configuration deadNodes = new Configuration();
long startLease = Long.MAX_VALUE;
for (final PeerId peer : peers) {
if (peer.equals(this.serverId)) {
aliveCount++;
continue;
}
this.checkReplicator(peer);
if (nowMs - replicatorGroup.getLastRpcSendTimestamp(peer) <= options.getElectionTimeoutMs()) {
checkReplicator(peer);
final long lastRpcSendTimestamp = this.replicatorGroup.getLastRpcSendTimestamp(peer);
if (monotonicNowMs - lastRpcSendTimestamp <= this.options.getLeaderLeaseTimeoutMs()) {
aliveCount++;
if (startLease > lastRpcSendTimestamp) {
startLease = lastRpcSendTimestamp;
}
continue;
}
deadNodes.addPeer(peer);
}

if (aliveCount >= peers.size() / 2 + 1) {
this.updateLastLeaderTimestamp();
this.updateLastLeaderTimestamp(startLease);
return;
}
LOG.warn("Node {} term {} steps down when alive nodes don't satisfy quorum dead nodes: {} conf: {}",
Expand All @@ -1769,10 +1814,10 @@ private void handleStepDownTimeout() {
LOG.debug("Node {} term {} stop stepdown timer state is {}", getNodeId(), this.currTerm, this.state);
return;
}
final long now = Utils.nowMs();
checkDeadNodes(this.conf.getConf(), now);
final long monotonicNowMs = Utils.monotonicMs();
checkDeadNodes(this.conf.getConf(), monotonicNowMs);
if (!conf.getOldConf().isEmpty()) {
checkDeadNodes(conf.getOldConf(), now);
checkDeadNodes(conf.getOldConf(), monotonicNowMs);
}
} finally {
writeLock.unlock();
Expand Down
24 changes: 12 additions & 12 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ void installSnapshot() {
this.state = State.Snapshot;
// noinspection NonAtomicOperationOnVolatileField
this.installSnapshotCounter++;
final long sendTime = Utils.nowMs();
final long monotonicSendTimeMs = Utils.monotonicMs();
final int stateVersion = this.version;
final int seq = getAndIncrementReqSeq();
final Future<Message> rpcFuture = rpcService.installSnapshot(this.options.getPeerId().getEndpoint(),
Expand All @@ -507,7 +507,7 @@ void installSnapshot() {
@Override
public void run(Status status) {
onRpcReturned(id, RequestType.Snapshot, status, request, getResponse(), seq, stateVersion,
sendTime);
monotonicSendTimeMs);
}
});
addInflight(RequestType.Snapshot, this.nextIndex, 0, 0, seq, rpcFuture);
Expand Down Expand Up @@ -595,7 +595,7 @@ private void sendEmptyEntries(final boolean isHeartbeat,
return;
}
try {
final long sendTime = Utils.nowMs();
final long monotonicSendTimeMs = Utils.monotonicMs();
final AppendEntriesRequest request = rb.build();

if (isHeartbeat) {
Expand All @@ -610,7 +610,7 @@ private void sendEmptyEntries(final boolean isHeartbeat,

@Override
public void run(Status status) {
onHeartbeatReturned(id, status, request, getResponse(), sendTime);
onHeartbeatReturned(id, status, request, getResponse(), monotonicSendTimeMs);
}
};
}
Expand All @@ -631,7 +631,7 @@ public void run(Status status) {
@Override
public void run(Status status) {
onRpcReturned(id, RequestType.AppendEntries, status, request, getResponse(), seq,
stateVersion, sendTime);
stateVersion, monotonicSendTimeMs);
}

});
Expand Down Expand Up @@ -709,9 +709,8 @@ public static ThreadId start(ReplicatorOptions opts, RaftOptions raftOptions) {
r.id.lock();
LOG.info("Replicator={}@{} is started", r.id, r.options.getPeerId());
r.catchUpClosure = null;
final long now = Utils.nowMs();
r.lastRpcSendTimestamp = now;
r.startHeartbeatTimer(now);
r.lastRpcSendTimestamp = Utils.monotonicMs();
r.startHeartbeatTimer(Utils.nowMs());
//id.unlock in sendEmptyEntries
r.sendEmptyEntries(false);
return r.id;
Expand Down Expand Up @@ -985,7 +984,7 @@ static void onHeartbeatReturned(ThreadId id, Status status, AppendEntriesRequest
r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true);
r.destroy();
node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE,
"Leader receives higher term hearbeat_response from peer:%s", r.options.getPeerId()));
"Leader receives higher term heartbeat_response from peer:%s", r.options.getPeerId()));
return;
}
if (isLogDebugEnabled) {
Expand Down Expand Up @@ -1145,7 +1144,7 @@ private static boolean onAppendEntriesReturned(ThreadId id, Inflight inflight, S
}
//record metrics
if (request.getEntriesCount() > 0) {
r.nodeMetrics.recordLatency("replicate-entries", Utils.nowMs() - rpcSendTime);
r.nodeMetrics.recordLatency("replicate-entries", Utils.monotonicMs() - rpcSendTime);
r.nodeMetrics.recordSize("replicate-entries-count", request.getEntriesCount());
r.nodeMetrics.recordSize("replicate-entries-bytes", request.getData() != null ? request.getData().size()
: 0);
Expand Down Expand Up @@ -1381,14 +1380,15 @@ private boolean sendEntries(final long nextSendingIndex) {
statInfo.lastLogIndex = rb.getPrevLogIndex() + rb.getEntriesCount();

final int v = this.version;
final long sendTimeMs = Utils.nowMs();
final long monotonicSendTimeMs = Utils.monotonicMs();
final int seq = getAndIncrementReqSeq();
final Future<Message> rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(),
request, -1, new RpcResponseClosureAdapter<AppendEntriesResponse>() {

@Override
public void run(Status status) {
onRpcReturned(id, RequestType.AppendEntries, status, request, getResponse(), seq, v, sendTimeMs);
onRpcReturned(id, RequestType.AppendEntries, status, request, getResponse(), seq, v,
monotonicSendTimeMs);
}

});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ public class NodeOptions extends RpcOptions {
// Default: 1000 (1s)
private int electionTimeoutMs = 1000; // follower to candidate timeout

// Leader lease time's ratio of electionTimeoutMs,
// To minimize the effects of clock drift, we should make that:
// clockDrift + leaderLeaseTimeoutMs < electionTimeout
// Default: 90, Max: 100
private int leaderLeaseTimeRatio = 90;

// A snapshot saving would be triggered every |snapshot_interval_s| seconds
// if this was reset as a positive number
// If |snapshot_interval_s| <= 0, the time based snapshot would be disabled.
Expand Down Expand Up @@ -182,6 +188,22 @@ public void setElectionTimeoutMs(int electionTimeoutMs) {
this.electionTimeoutMs = electionTimeoutMs;
}

public int getLeaderLeaseTimeRatio() {
return leaderLeaseTimeRatio;
}

public void setLeaderLeaseTimeRatio(int leaderLeaseTimeRatio) {
if (leaderLeaseTimeRatio <= 0 || leaderLeaseTimeRatio > 100) {
throw new IllegalArgumentException("leaderLeaseTimeRatio: " + leaderLeaseTimeRatio
+ " (expected: 0 < leaderLeaseTimeRatio <= 100)");
}
this.leaderLeaseTimeRatio = leaderLeaseTimeRatio;
}

public int getLeaderLeaseTimeoutMs() {
return this.electionTimeoutMs * this.leaderLeaseTimeRatio / 100;
}

public int getSnapshotIntervalSecs() {
return this.snapshotIntervalSecs;
}
Expand Down
Loading