Skip to content

Commit

Permalink
fix: reset waitId unexpectedly (#847)
Browse files Browse the repository at this point in the history
* fix: reset waitId unexpectedly when replicator blocks on network issue and improve log in replciator #842, #838

* test: fix testMetricRemoveOnDestroy
  • Loading branch information
killme2008 committed Jun 20, 2022
1 parent 27016a4 commit aec90ee
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class Replicator implements ThreadId.OnError {
private volatile long probeCounter = 0;
private volatile long appendEntriesCounter = 0;
private volatile long installSnapshotCounter = 0;
private volatile long blockCounter = 0;
protected Stat statInfo = new Stat();
private ScheduledFuture<?> blockTimer;

Expand Down Expand Up @@ -199,6 +200,7 @@ public Map<String, Metric> getMetrics() {
gauges.put("heartbeat-times", (Gauge<Long>) () -> this.r.heartbeatCounter);
gauges.put("install-snapshot-times", (Gauge<Long>) () -> this.r.installSnapshotCounter);
gauges.put("probe-times", (Gauge<Long>) () -> this.r.probeCounter);
gauges.put("block-times", (Gauge<Long>) () -> this.r.blockCounter);
gauges.put("append-entries-times", (Gauge<Long>) () -> this.r.appendEntriesCounter);
return gauges;
}
Expand Down Expand Up @@ -714,7 +716,7 @@ static boolean onInstallSnapshotReturned(final ThreadId id, final Replicator r,
sb.append(" error:").append(status);
LOG.info(sb.toString());
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if (++r.consecutiveErrorTimes % 10 == 0) {
if ((r.consecutiveErrorTimes++) % 10 == 0) {
LOG.warn("Fail to install snapshot at peer={}, error={}", r.options.getPeerId(), status);
}
success = false;
Expand Down Expand Up @@ -947,7 +949,7 @@ public static void waitForCaughtUp(final ThreadId id, final long maxMargin, fina
@Override
public String toString() {
return "Replicator [state=" + getState() + ", statInfo=" + this.statInfo + ", peerId="
+ this.options.getPeerId() + ", type=" + this.options.getReplicatorType() + "]";
+ this.options.getPeerId() + ", waitId=" + this.waitId + ", type=" + this.options.getReplicatorType() + "]";
}

static void onBlockTimeoutInNewThread(final ThreadId id) {
Expand Down Expand Up @@ -988,7 +990,7 @@ static boolean continueSending(final ThreadId id, final int errCode) {
if (r == null) {
return false;
}
r.waitId = -1;

if (errCode == RaftError.ETIMEDOUT.getNumber()) {
r.blockTimer = null;
// Send empty entries after block timeout to check the correct
Expand All @@ -997,6 +999,8 @@ static boolean continueSending(final ThreadId id, final int errCode) {
// last_index of this followers is less than |next_index - 1|
r.sendProbeRequest();
} else if (errCode != RaftError.ESTOP.getNumber()) {
// Only reset waitId before sending entries, fixed #842, #838
r.waitId = -1;
// id is unlock in _send_entries
r.sendEntries();
} else {
Expand All @@ -1021,6 +1025,7 @@ void block(final long startTimeMs, @SuppressWarnings("unused") final int errorCo
unlockId();
return;
}
this.blockCounter++;
final long dueTime = startTimeMs + this.options.getDynamicHeartBeatTimeoutMs();
try {
LOG.debug("Blocking {} for {} ms", this.options.getPeerId(), this.options.getDynamicHeartBeatTimeoutMs());
Expand Down Expand Up @@ -1192,7 +1197,7 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap
}
r.setState(State.Probe);
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if (++r.consecutiveErrorTimes % 10 == 0) {
if ((r.consecutiveErrorTimes++) % 10 == 0) {
LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(),
r.consecutiveErrorTimes, status);
}
Expand Down Expand Up @@ -1425,7 +1430,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
LOG.debug(sb.toString());
}
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if (++r.consecutiveErrorTimes % 10 == 0) {
if ((r.consecutiveErrorTimes++) % 10 == 0) {
LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(),
r.consecutiveErrorTimes, status);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void testMetricRemoveOnDestroy() {
assertNotNull(r);
assertSame(r.getOpts(), this.opts);
Set<String> metrics = this.opts.getNode().getNodeMetrics().getMetricRegistry().getNames();
assertEquals(7, metrics.size());
assertEquals(8, metrics.size());
r.destroy();
metrics = this.opts.getNode().getNodeMetrics().getMetricRegistry().getNames();
assertEquals(0, metrics.size());
Expand Down

0 comments on commit aec90ee

Please sign in to comment.