Skip to content

Commit

Permalink
Handle COMMIT after NEWLEADER in DIFF sync without NPE
Browse files Browse the repository at this point in the history
  • Loading branch information
jonmv committed Oct 3, 2022
1 parent 83812a9 commit 4aab1fd
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
boolean syncSnapshot = false;
readPacket(qp);
Deque<Long> packetsCommitted = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
Expand Down Expand Up @@ -643,33 +644,36 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
self.setLastSeenQuorumVerifier(qv, true);
}

packetsNotLogged.add(pif);
packetsNotCommitted.add(pif);
break;
case Leader.COMMIT:
case Leader.COMMITANDACTIVATE:
pif = packetsNotCommitted.peekFirst();
if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
boolean majorChange = self.processReconfig(
qv,
ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(),
true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn(
"Committing 0x{}, but next proposal is 0x{}",
Long.toHexString(qp.getZxid()),
Long.toHexString(pif.hdr.getZxid()));
} else {
if (qp.getType() == Leader.COMMITANDACTIVATE) {
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
boolean majorChange = self.processReconfig(
qv,
ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(),
true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
}
}
if (!writeToTxnLog) {
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn(
"Committing 0x{}, but next proposal is 0x{}",
Long.toHexString(qp.getZxid()),
Long.toHexString(pif.hdr.getZxid()));
} else {
if (!writeToTxnLog) {
zk.processTxn(pif.hdr, pif.rec);
packetsNotLogged.remove();
packetsNotCommitted.remove();
} else {
packetsNotCommitted.remove();
packetsCommitted.add(qp.getZxid());
}
} else {
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.INFORM:
Expand Down Expand Up @@ -708,7 +712,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
// Apply to db directly if we haven't taken the snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
packetsNotCommitted.add(packet);
packetsNotLogged.add(packet);
packetsCommitted.add(qp.getZxid());
}

Expand Down Expand Up @@ -756,10 +760,10 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
zk.startupWithoutServing();
if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
for (PacketInFlight p : packetsNotLogged) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
packetsNotCommitted.clear();
packetsNotLogged.clear();
}

writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
Expand All @@ -782,7 +786,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
// We need to log the stuff that came in between the snapshot and the uptodate
if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
for (PacketInFlight p : packetsNotLogged) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
for (Long zxid : packetsCommitted) {
Expand All @@ -792,7 +796,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
// Similar to follower, we need to log requests between the snapshot
// and UPTODATE
ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
for (PacketInFlight p : packetsNotLogged) {
Long zxid = packetsCommitted.peekFirst();
if (p.hdr.getZxid() != zxid) {
// log warning message if there is no matching commit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void flush() throws IOException {
LOG.warn("Closing connection to leader, exception during packet send", e);
try {
Socket socket = learner.sock;
if (socket != null && !learner.sock.isClosed()) {
if (socket != null && !socket.isClosed()) {
learner.sock.close();
}
} catch (IOException e1) {
Expand Down

0 comments on commit 4aab1fd

Please sign in to comment.