Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reset waitId unexpectedly #847

Merged
merged 2 commits into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class Replicator implements ThreadId.OnError {
private volatile long probeCounter = 0;
private volatile long appendEntriesCounter = 0;
private volatile long installSnapshotCounter = 0;
private volatile long blockCounter = 0;
protected Stat statInfo = new Stat();
private ScheduledFuture<?> blockTimer;

Expand Down Expand Up @@ -199,6 +200,7 @@ public Map<String, Metric> getMetrics() {
gauges.put("heartbeat-times", (Gauge<Long>) () -> this.r.heartbeatCounter);
gauges.put("install-snapshot-times", (Gauge<Long>) () -> this.r.installSnapshotCounter);
gauges.put("probe-times", (Gauge<Long>) () -> this.r.probeCounter);
gauges.put("block-times", (Gauge<Long>) () -> this.r.blockCounter);
gauges.put("append-entries-times", (Gauge<Long>) () -> this.r.appendEntriesCounter);
return gauges;
}
Expand Down Expand Up @@ -714,7 +716,7 @@ static boolean onInstallSnapshotReturned(final ThreadId id, final Replicator r,
sb.append(" error:").append(status);
LOG.info(sb.toString());
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if (++r.consecutiveErrorTimes % 10 == 0) {
if ((r.consecutiveErrorTimes++) % 10 == 0) {
LOG.warn("Fail to install snapshot at peer={}, error={}", r.options.getPeerId(), status);
}
success = false;
Expand Down Expand Up @@ -947,7 +949,7 @@ public static void waitForCaughtUp(final ThreadId id, final long maxMargin, fina
@Override
public String toString() {
return "Replicator [state=" + getState() + ", statInfo=" + this.statInfo + ", peerId="
+ this.options.getPeerId() + ", type=" + this.options.getReplicatorType() + "]";
+ this.options.getPeerId() + ", waitId=" + this.waitId + ", type=" + this.options.getReplicatorType() + "]";
}

static void onBlockTimeoutInNewThread(final ThreadId id) {
Expand Down Expand Up @@ -988,7 +990,7 @@ static boolean continueSending(final ThreadId id, final int errCode) {
if (r == null) {
return false;
}
r.waitId = -1;

if (errCode == RaftError.ETIMEDOUT.getNumber()) {
r.blockTimer = null;
// Send empty entries after block timeout to check the correct
Expand All @@ -997,6 +999,8 @@ static boolean continueSending(final ThreadId id, final int errCode) {
// last_index of this followers is less than |next_index - 1|
r.sendProbeRequest();
} else if (errCode != RaftError.ESTOP.getNumber()) {
// Only reset waitId before sending entries, fixed #842, #838
r.waitId = -1;
// id is unlock in _send_entries
r.sendEntries();
} else {
Expand All @@ -1021,6 +1025,7 @@ void block(final long startTimeMs, @SuppressWarnings("unused") final int errorCo
unlockId();
return;
}
this.blockCounter++;
final long dueTime = startTimeMs + this.options.getDynamicHeartBeatTimeoutMs();
try {
LOG.debug("Blocking {} for {} ms", this.options.getPeerId(), this.options.getDynamicHeartBeatTimeoutMs());
Expand Down Expand Up @@ -1192,7 +1197,7 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap
}
r.setState(State.Probe);
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if (++r.consecutiveErrorTimes % 10 == 0) {
if ((r.consecutiveErrorTimes++) % 10 == 0) {
LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(),
r.consecutiveErrorTimes, status);
}
Expand Down Expand Up @@ -1425,7 +1430,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
LOG.debug(sb.toString());
}
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if (++r.consecutiveErrorTimes % 10 == 0) {
if ((r.consecutiveErrorTimes++) % 10 == 0) {
LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(),
r.consecutiveErrorTimes, status);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void testMetricRemoveOnDestroy() {
assertNotNull(r);
assertSame(r.getOpts(), this.opts);
Set<String> metrics = this.opts.getNode().getNodeMetrics().getMetricRegistry().getNames();
assertEquals(7, metrics.size());
assertEquals(8, metrics.size());
r.destroy();
metrics = this.opts.getNode().getNodeMetrics().getMetricRegistry().getNames();
assertEquals(0, metrics.size());
Expand Down