-
Notifications
You must be signed in to change notification settings - Fork 7.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ZOOKEEPER-4541 Ephemeral znode owned by closed session visible in 1 of 3 servers #1925
Changes from 14 commits
e670dac
0444ede
02d9f8a
96f2104
83812a9
4aab1fd
ed275cb
51663e1
47eb1e8
84be071
ffa326b
840808c
882d640
5f5834b
3483274
0507f7c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ | |
import java.util.Objects; | ||
import java.util.Queue; | ||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.Semaphore; | ||
import java.util.concurrent.ThreadLocalRandom; | ||
|
@@ -55,6 +56,55 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req | |
|
||
private static final Request REQUEST_OF_DEATH = Request.requestOfDeath; | ||
|
||
private static class FlushRequest extends Request { | ||
private final CountDownLatch latch = new CountDownLatch(1); | ||
public FlushRequest() { | ||
super(null, 0, 0, 0, null, null); | ||
} | ||
} | ||
|
||
private static final Request turnForwardingDelayOn = new Request(null, 0, 0, 0, null, null); | ||
private static final Request turnForwardingDelayOff = new Request(null, 0, 0, 0, null, null); | ||
|
||
private static class DelayingProcessor implements RequestProcessor, Flushable { | ||
private final RequestProcessor next; | ||
private Queue<Request> delayed = null; | ||
private DelayingProcessor(RequestProcessor next) { | ||
this.next = next; | ||
} | ||
@Override | ||
public void flush() throws IOException { | ||
if (delayed == null && next instanceof Flushable) { | ||
((Flushable) next).flush(); | ||
} | ||
} | ||
@Override | ||
public void processRequest(Request request) throws RequestProcessorException { | ||
if (delayed == null) { | ||
next.processRequest(request); | ||
} else { | ||
delayed.add(request); | ||
} | ||
} | ||
@Override | ||
public void shutdown() { | ||
next.shutdown(); | ||
} | ||
private void close() { | ||
if (delayed == null) { | ||
delayed = new ArrayDeque<>(); | ||
} | ||
} | ||
private void open() throws RequestProcessorException { | ||
if (delayed != null) { | ||
for (Request request : delayed) { | ||
next.processRequest(request); | ||
} | ||
delayed = null; | ||
} | ||
} | ||
} | ||
|
||
/** The number of log entries to log before starting a snapshot */ | ||
private static int snapCount = ZooKeeperServer.getSnapCount(); | ||
|
||
|
@@ -75,7 +125,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req | |
|
||
private final ZooKeeperServer zks; | ||
|
||
private final RequestProcessor nextProcessor; | ||
private final DelayingProcessor nextProcessor; | ||
|
||
/** | ||
* Transactions that have been written and are waiting to be flushed to | ||
|
@@ -88,7 +138,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req | |
public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { | ||
super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener()); | ||
this.zks = zks; | ||
this.nextProcessor = nextProcessor; | ||
this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor); | ||
this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize()); | ||
} | ||
|
||
|
@@ -174,6 +224,21 @@ public void run() { | |
break; | ||
} | ||
|
||
if (si == turnForwardingDelayOn) { | ||
nextProcessor.close(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At the ctor of SyncRequestProcessor, nextProcessor may be null. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only followers enqueue these special requests, so that can't happen. Observers don't ack txns, as far as I remember? |
||
continue; | ||
} | ||
if (si == turnForwardingDelayOff) { | ||
nextProcessor.open(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This naming here is confusing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
continue; | ||
} | ||
|
||
if (si instanceof FlushRequest) { | ||
flush(); | ||
((FlushRequest) si).latch.countDown(); | ||
continue; | ||
} | ||
|
||
long startProcessTime = Time.currentElapsedTime(); | ||
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime); | ||
|
||
|
@@ -206,9 +271,7 @@ public void run() { | |
// and there are no pending flushes (writes), then just pass this to the next processor | ||
if (nextProcessor != null) { | ||
nextProcessor.processRequest(si); | ||
if (nextProcessor instanceof Flushable) { | ||
((Flushable) nextProcessor).flush(); | ||
} | ||
nextProcessor.flush(); | ||
} | ||
continue; | ||
} | ||
|
@@ -224,6 +287,17 @@ public void run() { | |
LOG.info("SyncRequestProcessor exited!"); | ||
} | ||
|
||
/** Flushes all pending writes, and waits for this to complete. */ | ||
public void syncFlush() throws InterruptedException { | ||
FlushRequest marker = new FlushRequest(); | ||
queuedRequests.add(marker); | ||
marker.latch.await(); | ||
} | ||
|
||
public void setDelayForwarding(boolean delayForwarding) { | ||
queuedRequests.add(delayForwarding ? turnForwardingDelayOn : turnForwardingDelayOff); | ||
} | ||
|
||
private void flush() throws IOException, RequestProcessorException { | ||
if (this.toFlush.isEmpty()) { | ||
return; | ||
|
@@ -244,9 +318,7 @@ private void flush() throws IOException, RequestProcessorException { | |
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency); | ||
this.nextProcessor.processRequest(i); | ||
} | ||
if (this.nextProcessor instanceof Flushable) { | ||
((Flushable) this.nextProcessor).flush(); | ||
} | ||
nextProcessor.flush(); | ||
} | ||
lastFlushTime = Time.currentElapsedTime(); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -155,11 +155,11 @@ protected void unregisterMetrics() { | |
} | ||
|
||
@Override | ||
public synchronized void shutdown() { | ||
public synchronized void shutdown(boolean fullyShutDown) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a little worried about the modification here has an impact on the invoking chain. Before modification: Leader.shutdown(String) -> LeaderZooKeeperServer.shutdown() -> ZooKeeperServer.shutdown() LeaderZooKeeperServer.shutdown is skipped and containerManager does not stop. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
if (containerManager != null) { | ||
containerManager.stop(); | ||
} | ||
super.shutdown(); | ||
super.shutdown(fullyShutDown); | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to be a new processor? could we inline this into the SyncRequestProcessor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't need to, but I found the
SyncRequestProcessor
to be complicated enough already, and this was a separate set of concerns, so I felt it was cleaner to put it in a separate processor.