Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZOOKEEPER-4541 Ephemeral znode owned by closed session visible in 1 of 3 servers #1925

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -55,6 +56,55 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req

private static final Request REQUEST_OF_DEATH = Request.requestOfDeath;

private static class FlushRequest extends Request {
private final CountDownLatch latch = new CountDownLatch(1);
public FlushRequest() {
super(null, 0, 0, 0, null, null);
}
}

private static final Request turnForwardingDelayOn = new Request(null, 0, 0, 0, null, null);
private static final Request turnForwardingDelayOff = new Request(null, 0, 0, 0, null, null);

private static class DelayingProcessor implements RequestProcessor, Flushable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be a new processor? could we inline this into the SyncRequestProcessor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't need to, but I found the SyncRequestProcessor to be complicated enough already, and this was a separate set of concerns, so I felt it was cleaner to put it in a separate processor.

private final RequestProcessor next;
private Queue<Request> delayed = null;
private DelayingProcessor(RequestProcessor next) {
this.next = next;
}
@Override
public void flush() throws IOException {
if (delayed == null && next instanceof Flushable) {
((Flushable) next).flush();
}
}
@Override
public void processRequest(Request request) throws RequestProcessorException {
if (delayed == null) {
next.processRequest(request);
} else {
delayed.add(request);
}
}
@Override
public void shutdown() {
next.shutdown();
}
private void close() {
if (delayed == null) {
delayed = new ArrayDeque<>();
}
}
private void open() throws RequestProcessorException {
if (delayed != null) {
for (Request request : delayed) {
next.processRequest(request);
}
delayed = null;
}
}
}

/** The number of log entries to log before starting a snapshot */
private static int snapCount = ZooKeeperServer.getSnapCount();

Expand All @@ -75,7 +125,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req

private final ZooKeeperServer zks;

private final RequestProcessor nextProcessor;
private final DelayingProcessor nextProcessor;

/**
* Transactions that have been written and are waiting to be flushed to
Expand All @@ -88,7 +138,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req
public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener());
this.zks = zks;
this.nextProcessor = nextProcessor;
this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor);
this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize());
}

Expand Down Expand Up @@ -174,6 +224,21 @@ public void run() {
break;
}

if (si == turnForwardingDelayOn) {
nextProcessor.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the ctor of SyncRequestProcessor, nextProcessor may be null.
Can this be an NPE at ObserverZooKeeperServer(with syncRequestProcessorEnabled=true)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only followers enqueue these special requests, so that can't happen. Observers don't ack txns, as far as I remember?

continue;
}
if (si == turnForwardingDelayOff) {
nextProcessor.open();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This naming here is confusing.
The intention here is: to get the turning delaying off request, open the gate, then flush all pending requests to the downstream processor.
nextProcessor.open() is to open the gate or turn the delay on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open() opens the gate. What about startDelaying() and flushAndStopDelaying()?

continue;
}

if (si instanceof FlushRequest) {
flush();
((FlushRequest) si).latch.countDown();
continue;
}

long startProcessTime = Time.currentElapsedTime();
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);

Expand Down Expand Up @@ -206,9 +271,7 @@ public void run() {
// and there are no pending flushes (writes), then just pass this to the next processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable) nextProcessor).flush();
}
nextProcessor.flush();
}
continue;
}
Expand All @@ -224,6 +287,17 @@ public void run() {
LOG.info("SyncRequestProcessor exited!");
}

/** Flushes all pending writes, and waits for this to complete. */
public void syncFlush() throws InterruptedException {
FlushRequest marker = new FlushRequest();
queuedRequests.add(marker);
marker.latch.await();
}

public void setDelayForwarding(boolean delayForwarding) {
queuedRequests.add(delayForwarding ? turnForwardingDelayOn : turnForwardingDelayOff);
}

private void flush() throws IOException, RequestProcessorException {
if (this.toFlush.isEmpty()) {
return;
Expand All @@ -244,9 +318,7 @@ private void flush() throws IOException, RequestProcessorException {
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
this.nextProcessor.processRequest(i);
}
if (this.nextProcessor instanceof Flushable) {
((Flushable) this.nextProcessor).flush();
}
nextProcessor.flush();
}
lastFlushTime = Time.currentElapsedTime();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ public synchronized void shutdown(boolean fullyShutDown) {
// * If we fetch a new snapshot from leader, the zkDb will be
// cleared anyway before loading the snapshot
try {
//This will fast forward the database to the latest recorded transactions
// This will fast-forward the database to the latest recorded transactions
zkDb.fastForwardDataBase();
} catch (IOException e) {
LOG.error("Error updating DB", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,11 @@ protected void unregisterMetrics() {
}

@Override
public synchronized void shutdown() {
public synchronized void shutdown(boolean fullyShutDown) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried about the modification here has an impact on the invoking chain.

Before modification: Leader.shutdown(String) -> LeaderZooKeeperServer.shutdown() -> ZooKeeperServer.shutdown()
After modification: Leader.shutdown() -> ZooKeeperServer.shutdown()

LeaderZooKeeperServer.shutdown is skipped and containerManager does not stop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ZooKeeperServer.shutdown() only calls shutdown(false), which is implemented in LeaderZooKeeperServer, and which stops the containerManager. shutdown() isn't overridden anywhere anymore.

if (containerManager != null) {
containerManager.stop();
}
super.shutdown();
super.shutdown(fullyShutDown);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
boolean syncSnapshot = false;
readPacket(qp);
Deque<Long> packetsCommitted = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since you are adding a list that is quite parallel to packetsNotCommitted can you put comments to how the two differ? (and perhaps why both are needed?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will add some explanation!

Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
Expand Down Expand Up @@ -643,33 +644,36 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
self.setLastSeenQuorumVerifier(qv, true);
}

packetsNotLogged.add(pif);
packetsNotCommitted.add(pif);
break;
case Leader.COMMIT:
case Leader.COMMITANDACTIVATE:
pif = packetsNotCommitted.peekFirst();
if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
boolean majorChange = self.processReconfig(
qv,
ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(),
true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn(
"Committing 0x{}, but next proposal is 0x{}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can't remember if this got fixed, but do we still have a gratuitous print of this warning when we first sync? (if so, it might be nice to fix since we are touching it.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plenty of them around, still :)

Long.toHexString(qp.getZxid()),
Long.toHexString(pif.hdr.getZxid()));
} else {
if (qp.getType() == Leader.COMMITANDACTIVATE) {
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
boolean majorChange = self.processReconfig(
qv,
ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(),
true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
}
}
if (!writeToTxnLog) {
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn(
"Committing 0x{}, but next proposal is 0x{}",
Long.toHexString(qp.getZxid()),
Long.toHexString(pif.hdr.getZxid()));
} else {
if (!writeToTxnLog) {
zk.processTxn(pif.hdr, pif.rec);
packetsNotLogged.remove();
packetsNotCommitted.remove();
} else {
packetsNotCommitted.remove();
packetsCommitted.add(qp.getZxid());
}
} else {
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.INFORM:
Expand Down Expand Up @@ -708,7 +712,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
// Apply to db directly if we haven't taken the snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
packetsNotCommitted.add(packet);
packetsNotLogged.add(packet);
packetsCommitted.add(qp.getZxid());
}

Expand Down Expand Up @@ -756,13 +760,21 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
zk.startupWithoutServing();
if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
fzk.syncProcessor.setDelayForwarding(true);
for (PacketInFlight p : packetsNotLogged) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
packetsNotCommitted.clear();
packetsNotLogged.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this one of the key bugs? did we need to keep packetsNotCommitted around?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the bug that would cause the learner to crash during sync, because it "forgot" a previous PROPOSAL on the NEWLEADER, and would then fail to match up that PROPOSAL with a later COMMIT, if one was sent during the sync.
In turn, this caused the learner to have to re-sync, which could trigger the same crash again if there was heavy concurrent write traffic, and it would also give duplicate series of transactions in the transaction logs, with resulting transaction digest mismatch on that server (but otherwise consistent data view).

So we need a separation of what's not yet written to the log, and what's not yet matched with a COMMIT, which is what these two queues are about.

fzk.syncProcessor.syncFlush();
}

writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);

if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
fzk.syncProcessor.setDelayForwarding(false);
fzk.syncProcessor.syncFlush();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason why we need the second syncFlush?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure consistent ordering of the UPTODATE ACK, vs ACKs from PROPOSALs. The real leader doesn't care, but unit tests may.

}
break;
}
}
Expand All @@ -782,7 +794,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
// We need to log the stuff that came in between the snapshot and the uptodate
if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
for (PacketInFlight p : packetsNotLogged) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
for (Long zxid : packetsCommitted) {
Expand All @@ -792,7 +804,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
// Similar to follower, we need to log requests between the snapshot
// and UPTODATE
ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
for (PacketInFlight p : packetsNotLogged) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not completely obvious which list to use here. i think a good comment on the semantics of the two lists is key.

Long zxid = packetsCommitted.peekFirst();
if (p.hdr.getZxid() != zxid) {
// log warning message if there is no matching commit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,24 +152,30 @@ protected void unregisterJMX(Learner peer) {
}

@Override
public synchronized void shutdown() {
public synchronized void shutdown(boolean fullyShutDown) {
if (!canShutdown()) {
LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
return;
} else {
LOG.info("Shutting down");
try {
if (syncProcessor != null) {
// Shutting down the syncProcessor here, first, ensures queued transactions here are written to
// permanent storage, which ensures that crash recovery data is consistent with what is used for a
// leader election immediately following shutdown, because of the old leader going down; and also
// that any state on its way to being written is also loaded in the potential call to
// fast-forward-from-edits, in super.shutdown(...), so we avoid getting a DIFF from the new leader
// that contains entries we have already written to our transaction log.
syncProcessor.shutdown();
}
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception in syncprocessor shutdown", e);
}
}
LOG.info("Shutting down");
try {
super.shutdown();
super.shutdown(fullyShutDown);
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception during shutdown", e);
}
try {
if (syncProcessor != null) {
syncProcessor.shutdown();
}
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception in syncprocessor shutdown", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public Learner getLearner() {
* @param request
jonmv marked this conversation as resolved.
Show resolved Hide resolved
*/
public void commitRequest(Request request) {
if (syncRequestProcessorEnabled) {
if (syncProcessor != null) {
// Write to txnlog and take periodic snapshot
syncProcessor.processRequest(request);
}
Expand Down Expand Up @@ -106,6 +106,8 @@ protected void setupRequestProcessors() {
if (syncRequestProcessorEnabled) {
syncProcessor = new SyncRequestProcessor(this, null);
syncProcessor.start();
} else {
syncProcessor = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syncProcessor as an ObserverZooKeeperServer field should have a default value of null.
Does setting null here makes a difference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I'm just used to always assigning (to final fields). This can be removed again.

}
}

Expand All @@ -127,18 +129,6 @@ public String getState() {
return "observer";
}

@Override
public synchronized void shutdown() {
if (!canShutdown()) {
LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
return;
}
super.shutdown();
if (syncRequestProcessorEnabled && syncProcessor != null) {
syncProcessor.shutdown();
}
}

@Override
public void dumpMonitorValues(BiConsumer<String, Object> response) {
super.dumpMonitorValues(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,23 +190,23 @@ public long getServerId() {
}

@Override
public synchronized void shutdown() {
public synchronized void shutdown(boolean fullyShutDown) {
if (!canShutdown()) {
super.shutdown(fullyShutDown);
jonmv marked this conversation as resolved.
Show resolved Hide resolved
LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
return;
}
shutdown = true;
unregisterJMX(this);
} else {
shutdown = true;
unregisterJMX(this);

// set peer's server to null
self.setZooKeeperServer(null);
// clear all the connections
self.closeAllConnections();

self.adminServer.setZooKeeperServer(null);
// set peer's server to null
self.setZooKeeperServer(null);
// clear all the connections
self.closeAllConnections();

self.adminServer.setZooKeeperServer(null);
}
// shutdown the server itself
super.shutdown();
super.shutdown(fullyShutDown);
}

@Override
Expand Down
Loading