diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java index 6a3a4ff61..fdd98c9e8 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java @@ -97,6 +97,7 @@ public class Replicator implements ThreadId.OnError { private volatile long probeCounter = 0; private volatile long appendEntriesCounter = 0; private volatile long installSnapshotCounter = 0; + private volatile long blockCounter = 0; protected Stat statInfo = new Stat(); private ScheduledFuture blockTimer; @@ -199,6 +200,7 @@ public Map getMetrics() { gauges.put("heartbeat-times", (Gauge) () -> this.r.heartbeatCounter); gauges.put("install-snapshot-times", (Gauge) () -> this.r.installSnapshotCounter); gauges.put("probe-times", (Gauge) () -> this.r.probeCounter); + gauges.put("block-times", (Gauge) () -> this.r.blockCounter); gauges.put("append-entries-times", (Gauge) () -> this.r.appendEntriesCounter); return gauges; } @@ -714,7 +716,7 @@ static boolean onInstallSnapshotReturned(final ThreadId id, final Replicator r, sb.append(" error:").append(status); LOG.info(sb.toString()); notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status); - if (++r.consecutiveErrorTimes % 10 == 0) { + if ((r.consecutiveErrorTimes++) % 10 == 0) { LOG.warn("Fail to install snapshot at peer={}, error={}", r.options.getPeerId(), status); } success = false; @@ -947,7 +949,7 @@ public static void waitForCaughtUp(final ThreadId id, final long maxMargin, fina @Override public String toString() { return "Replicator [state=" + getState() + ", statInfo=" + this.statInfo + ", peerId=" - + this.options.getPeerId() + ", type=" + this.options.getReplicatorType() + "]"; + + this.options.getPeerId() + ", waitId=" + this.waitId + ", type=" + this.options.getReplicatorType() + "]"; } static void onBlockTimeoutInNewThread(final ThreadId id) { @@ -988,7 +990,7 @@ static boolean continueSending(final ThreadId id, final int errCode) { if (r == null) { return false; } - r.waitId = -1; + if (errCode == RaftError.ETIMEDOUT.getNumber()) { r.blockTimer = null; // Send empty entries after block timeout to check the correct @@ -997,6 +999,8 @@ static boolean continueSending(final ThreadId id, final int errCode) { // last_index of this followers is less than |next_index - 1| r.sendProbeRequest(); } else if (errCode != RaftError.ESTOP.getNumber()) { + // Only reset waitId before sending entries, fixed #842, #838 + r.waitId = -1; // id is unlock in _send_entries r.sendEntries(); } else { @@ -1021,6 +1025,7 @@ void block(final long startTimeMs, @SuppressWarnings("unused") final int errorCo unlockId(); return; } + this.blockCounter++; final long dueTime = startTimeMs + this.options.getDynamicHeartBeatTimeoutMs(); try { LOG.debug("Blocking {} for {} ms", this.options.getPeerId(), this.options.getDynamicHeartBeatTimeoutMs()); @@ -1192,7 +1197,7 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap } r.setState(State.Probe); notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status); - if (++r.consecutiveErrorTimes % 10 == 0) { + if ((r.consecutiveErrorTimes++) % 10 == 0) { LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(), r.consecutiveErrorTimes, status); } @@ -1425,7 +1430,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight LOG.debug(sb.toString()); } notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status); - if (++r.consecutiveErrorTimes % 10 == 0) { + if ((r.consecutiveErrorTimes++) % 10 == 0) { LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(), r.consecutiveErrorTimes, status); }