Skip to content

Commit

Permalink
fix: waitId may case memory leak when replicator blocks on network is…
Browse files Browse the repository at this point in the history
…sue and improve log in replciator #842, #838
  • Loading branch information
killme2008 committed Jun 18, 2022
1 parent 8dc5c08 commit 6f6ecbb
Showing 1 changed file with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class Replicator implements ThreadId.OnError {
private volatile long probeCounter = 0;
private volatile long appendEntriesCounter = 0;
private volatile long installSnapshotCounter = 0;
private volatile long blockCounter = 0;
protected Stat statInfo = new Stat();
private ScheduledFuture<?> blockTimer;

Expand Down Expand Up @@ -199,6 +200,7 @@ public Map<String, Metric> getMetrics() {
gauges.put("heartbeat-times", (Gauge<Long>) () -> this.r.heartbeatCounter);
gauges.put("install-snapshot-times", (Gauge<Long>) () -> this.r.installSnapshotCounter);
gauges.put("probe-times", (Gauge<Long>) () -> this.r.probeCounter);
gauges.put("block-times", (Gauge<Long>) () -> this.r.blockCounter);
gauges.put("append-entries-times", (Gauge<Long>) () -> this.r.appendEntriesCounter);
return gauges;
}
Expand Down Expand Up @@ -714,7 +716,7 @@ static boolean onInstallSnapshotReturned(final ThreadId id, final Replicator r,
sb.append(" error:").append(status);
LOG.info(sb.toString());
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if (++r.consecutiveErrorTimes % 10 == 0) {
if ((r.consecutiveErrorTimes++) % 10 == 0) {
LOG.warn("Fail to install snapshot at peer={}, error={}", r.options.getPeerId(), status);
}
success = false;
Expand Down Expand Up @@ -947,7 +949,7 @@ public static void waitForCaughtUp(final ThreadId id, final long maxMargin, fina
@Override
public String toString() {
return "Replicator [state=" + getState() + ", statInfo=" + this.statInfo + ", peerId="
+ this.options.getPeerId() + ", type=" + this.options.getReplicatorType() + "]";
+ this.options.getPeerId() + ", waitId=" + this.waitId + ", type=" + this.options.getReplicatorType() + "]";
}

static void onBlockTimeoutInNewThread(final ThreadId id) {
Expand Down Expand Up @@ -988,7 +990,7 @@ static boolean continueSending(final ThreadId id, final int errCode) {
if (r == null) {
return false;
}
r.waitId = -1;

if (errCode == RaftError.ETIMEDOUT.getNumber()) {
r.blockTimer = null;
// Send empty entries after block timeout to check the correct
Expand All @@ -997,6 +999,8 @@ static boolean continueSending(final ThreadId id, final int errCode) {
// last_index of this followers is less than |next_index - 1|
r.sendProbeRequest();
} else if (errCode != RaftError.ESTOP.getNumber()) {
// Only reset waitId before sending entries, fixed #842, #838
r.waitId = -1;
// id is unlock in _send_entries
r.sendEntries();
} else {
Expand All @@ -1021,6 +1025,7 @@ void block(final long startTimeMs, @SuppressWarnings("unused") final int errorCo
unlockId();
return;
}
this.blockCounter++;
final long dueTime = startTimeMs + this.options.getDynamicHeartBeatTimeoutMs();
try {
LOG.debug("Blocking {} for {} ms", this.options.getPeerId(), this.options.getDynamicHeartBeatTimeoutMs());
Expand Down Expand Up @@ -1192,7 +1197,7 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap
}
r.setState(State.Probe);
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if (++r.consecutiveErrorTimes % 10 == 0) {
if ((r.consecutiveErrorTimes++) % 10 == 0) {
LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(),
r.consecutiveErrorTimes, status);
}
Expand Down Expand Up @@ -1425,7 +1430,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
LOG.debug(sb.toString());
}
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if (++r.consecutiveErrorTimes % 10 == 0) {
if ((r.consecutiveErrorTimes++) % 10 == 0) {
LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(),
r.consecutiveErrorTimes, status);
}
Expand Down

0 comments on commit 6f6ecbb

Please sign in to comment.