From 128c52eeffeb0d009aedc3b92e8bd30852332a09 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Sun, 5 Mar 2023 17:02:57 -0800 Subject: [PATCH] Simplified the logic for ForceWriteThread after we introduced queue.drainTo() (#3830) ### Motivation In #3545 we have switched the `ForceWriteThread` to take advantage o `BlockingQueue.drainTo()` method for reducing contention, though the core logic of the force-write was not touched at the time. The logic of force-write is quite complicated because it tries to group multiple force-write requests in the queue by sending a new marker and grouping them when the marker is received. This also leads to a bit of lag when there are many requests coming in and the IO is stressed, as we're waiting a bit more before issuing the fsync. Instead, with the `drainTo()` approach we can greatly simplify the logic and maintain a strict fsync grouping: 1. drain all the force-write-requests available in the queue into a local array list 2. perform the fsync 3. update the journal log mark to the position of the last fw request 4. trigger send-responses for all the requests 5. go back to read from the queue This refactoring will also enable further improvements, to optimize how the send responses are prepared, since we have now a list of responses ready to send. --- .../bookie/BookKeeperServerStats.java | 4 - .../org/apache/bookkeeper/bookie/Journal.java | 186 +++++------------- .../bookkeeper/bookie/stats/JournalStats.java | 30 --- .../apache/bookkeeper/test/OpStatTest.java | 2 - 4 files changed, 45 insertions(+), 177 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java index 888577d4580..d4657d20362 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java @@ -109,7 +109,6 @@ public interface BookKeeperServerStats { String JOURNAL_FORCE_WRITE_ENQUEUE = "JOURNAL_FORCE_WRITE_ENQUEUE"; String JOURNAL_FORCE_WRITE_BATCH_ENTRIES = "JOURNAL_FORCE_WRITE_BATCH_ENTRIES"; String JOURNAL_FORCE_WRITE_BATCH_BYTES = "JOURNAL_FORCE_WRITE_BATCH_BYTES"; - String JOURNAL_FORCE_WRITE_GROUPING_FAILURES = "JOURNAL_FORCE_WRITE_GROUPING_FAILURES"; String JOURNAL_FLUSH_LATENCY = "JOURNAL_FLUSH_LATENCY"; String JOURNAL_QUEUE_LATENCY = "JOURNAL_QUEUE_LATENCY"; String JOURNAL_QUEUE_MAX_SIZE = "JOURNAL_QUEUE_MAX_SIZE"; @@ -158,9 +157,6 @@ public interface BookKeeperServerStats { String INDEX_INMEM_ILLEGAL_STATE_RESET = "INDEX_INMEM_ILLEGAL_STATE_RESET"; String INDEX_INMEM_ILLEGAL_STATE_DELETE = "INDEX_INMEM_ILLEGAL_STATE_DELETE"; String JOURNAL_FORCE_WRITE_QUEUE_SIZE = "JOURNAL_FORCE_WRITE_QUEUE_SIZE"; - String JOURNAL_CB_QUEUE_SIZE = "JOURNAL_CB_QUEUE_SIZE"; - String CB_THREAD_POOL_QUEUE_SIZE = "CB_THREAD_POOL_QUEUE_SIZE"; - String JOURNAL_CB_QUEUED_LATENCY = "JOURNAL_CB_QUEUED_LATENCY"; String JOURNAL_NUM_FLUSH_EMPTY_QUEUE = "JOURNAL_NUM_FLUSH_EMPTY_QUEUE"; String JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES = "JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES"; String JOURNAL_NUM_FLUSH_MAX_WAIT = "JOURNAL_NUM_FLUSH_MAX_WAIT"; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index a71d4fb2c89..d14d5cb8a5f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -299,19 +299,14 @@ static class QueueEntry implements Runnable { WriteCallback cb; Object ctx; long enqueueTime; - long enqueueCbThreadPooleQueueTime; boolean ackBeforeSync; OpStatsLogger journalAddEntryStats; - OpStatsLogger journalCbQueuedLatency; - Counter journalCbQueueSize; - Counter cbThreadPoolQueueSize; Counter callbackTime; static QueueEntry create(ByteBuf entry, boolean ackBeforeSync, long ledgerId, long entryId, WriteCallback cb, Object ctx, long enqueueTime, OpStatsLogger journalAddEntryStats, - Counter journalCbQueueSize, Counter cbThreadPoolQueueSize, - OpStatsLogger journalCbQueuedLatency, Counter callbackTime) { + Counter callbackTime) { QueueEntry qe = RECYCLER.get(); qe.entry = entry; qe.ackBeforeSync = ackBeforeSync; @@ -321,27 +316,16 @@ static QueueEntry create(ByteBuf entry, boolean ackBeforeSync, long ledgerId, lo qe.entryId = entryId; qe.enqueueTime = enqueueTime; qe.journalAddEntryStats = journalAddEntryStats; - qe.journalCbQueuedLatency = journalCbQueuedLatency; - qe.journalCbQueueSize = journalCbQueueSize; - qe.cbThreadPoolQueueSize = cbThreadPoolQueueSize; qe.callbackTime = callbackTime; return qe; } - public void setEnqueueCbThreadPooleQueueTime(long enqueueCbThreadPooleQueueTime) { - this.enqueueCbThreadPooleQueueTime = enqueueCbThreadPooleQueueTime; - } - @Override public void run() { - journalCbQueuedLatency.registerSuccessfulEvent( - MathUtils.elapsedNanos(enqueueCbThreadPooleQueueTime), TimeUnit.NANOSECONDS); long startTime = System.nanoTime(); if (LOG.isDebugEnabled()) { LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId); } - journalCbQueueSize.dec(); - cbThreadPoolQueueSize.dec(); journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS); cb.writeComplete(0, ledgerId, entryId, null, ctx); callbackTime.addLatency(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); @@ -366,9 +350,6 @@ private void recycle() { this.cb = null; this.ctx = null; this.journalAddEntryStats = null; - this.journalCbQueuedLatency = null; - this.journalCbQueueSize = null; - this.cbThreadPoolQueueSize = null; this.callbackTime = null; recyclerHandle.recycle(this); } @@ -378,51 +359,25 @@ private void recycle() { * Token which represents the need to force a write to the Journal. */ @VisibleForTesting - public class ForceWriteRequest { + public static class ForceWriteRequest { private JournalChannel logFile; private RecyclableArrayList forceWriteWaiters; private boolean shouldClose; - private boolean isMarker; private long lastFlushedPosition; private long logId; - private long enqueueTime; - public int process(boolean shouldForceWrite) throws IOException { - journalStats.getForceWriteQueueSize().dec(); - journalStats.getFwEnqueueTimeStats() - .registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS); - - if (isMarker) { - return 0; - } + public int process() { + closeFileIfNecessary(); - long startTime = MathUtils.nowInNano(); - try { - if (shouldForceWrite) { - this.logFile.forceWrite(false); - journalStats.getJournalSyncStats() - .registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); - } - lastLogMark.setCurLogMark(this.logId, this.lastFlushedPosition); - - // Notify the waiters that the force write succeeded - for (int i = 0; i < forceWriteWaiters.size(); i++) { - QueueEntry qe = forceWriteWaiters.get(i); - if (qe != null) { - qe.setEnqueueCbThreadPooleQueueTime(MathUtils.nowInNano()); - journalStats.getCbThreadPoolQueueSize().inc(); - qe.run(); - } + // Notify the waiters that the force write succeeded + for (int i = 0; i < forceWriteWaiters.size(); i++) { + QueueEntry qe = forceWriteWaiters.get(i); + if (qe != null) { + qe.run(); } - - return forceWriteWaiters.size(); - } catch (IOException e) { - journalStats.getJournalSyncStats() - .registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); - throw e; - } finally { - closeFileIfNecessary(); } + + return forceWriteWaiters.size(); } public void closeFileIfNecessary() { @@ -431,6 +386,7 @@ public void closeFileIfNecessary() { // We should guard against exceptions so its // safe to call in catch blocks try { + logFile.forceWrite(false); logFile.close(); // Call close only once shouldClose = false; @@ -460,21 +416,18 @@ private ForceWriteRequest createForceWriteRequest(JournalChannel logFile, long logId, long lastFlushedPosition, RecyclableArrayList forceWriteWaiters, - boolean shouldClose, - boolean isMarker) { + boolean shouldClose) { ForceWriteRequest req = forceWriteRequestsRecycler.get(); req.forceWriteWaiters = forceWriteWaiters; req.logFile = logFile; req.logId = logId; req.lastFlushedPosition = lastFlushedPosition; req.shouldClose = shouldClose; - req.isMarker = isMarker; - req.enqueueTime = MathUtils.nowInNano(); journalStats.getForceWriteQueueSize().inc(); return req; } - private final Recycler forceWriteRequestsRecycler = new Recycler() { + private static final Recycler forceWriteRequestsRecycler = new Recycler() { @Override protected ForceWriteRequest newObject( Recycler.Handle handle) { @@ -495,10 +448,6 @@ private class ForceWriteThread extends BookieCriticalThread { private final boolean enableGroupForceWrites; private final Counter forceWriteThreadTime; - boolean shouldForceWrite = true; - int numReqInLastForceWrite = 0; - boolean forceWriteMarkerSent = false; - public ForceWriteThread(Thread threadToNotifyOnEx, boolean enableGroupForceWrites, StatsLogger statsLogger) { @@ -520,12 +469,11 @@ public void run() { } } - long busyStartTime = System.nanoTime(); - - List localRequests = new ArrayList<>(); + final List localRequests = new ArrayList<>(); while (running) { try { + int numReqInLastForceWrite = 0; int requestsCount = forceWriteRequests.drainTo(localRequests); if (requestsCount == 0) { @@ -534,11 +482,26 @@ public void run() { requestsCount = 1; } + journalStats.getForceWriteQueueSize().addCount(-requestsCount); + + // Sync and mark the journal up to the position of the last entry in the batch + ForceWriteRequest lastRequest = localRequests.get(requestsCount - 1); + syncJournal(lastRequest); + + // All the requests in the batch are now fully-synced. We can trigger sending the + // responses for (int i = 0; i < requestsCount; i++) { - forceWriteThreadTime.addLatency(MathUtils.elapsedNanos(busyStartTime), TimeUnit.NANOSECONDS); - processForceWriteRequest(localRequests.get(i)); - busyStartTime = System.nanoTime(); + ForceWriteRequest req = localRequests.get(i); + numReqInLastForceWrite += req.process(); + req.recycle(); } + + journalStats.getForceWriteGroupingCountStats() + .registerSuccessfulValue(numReqInLastForceWrite); + + } catch (IOException ioe) { + LOG.error("I/O exception in ForceWrite thread", ioe); + running = false; } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.info("ForceWrite thread interrupted"); @@ -559,66 +522,17 @@ public void run() { threadToNotifyOnEx.interrupt(); } - private void processForceWriteRequest(ForceWriteRequest req) { + private void syncJournal(ForceWriteRequest lastRequest) throws IOException { + long fsyncStartTime = MathUtils.nowInNano(); try { - // Force write the file and then notify the write completions - // - if (!req.isMarker) { - if (shouldForceWrite) { - // if we are going to force write, any request that is already in the - // queue will benefit from this force write - post a marker prior to issuing - // the flush so until this marker is encountered we can skip the force write - if (enableGroupForceWrites) { - ForceWriteRequest marker = - createForceWriteRequest(req.logFile, 0, 0, null, false, true); - forceWriteMarkerSent = forceWriteRequests.offer(marker); - if (!forceWriteMarkerSent) { - marker.recycle(); - Counter failures = journalStats.getForceWriteGroupingFailures(); - failures.inc(); - LOG.error( - "Fail to send force write grouping marker," - + " Journal.forceWriteRequests queue(capacity {}) is full," - + " current failure counter is {}.", - conf.getJournalQueueSize(), failures.get()); - } - } - - // If we are about to issue a write, record the number of requests in - // the last force write and then reset the counter so we can accumulate - // requests in the write we are about to issue - if (numReqInLastForceWrite > 0) { - journalStats.getForceWriteGroupingCountStats() - .registerSuccessfulValue(numReqInLastForceWrite); - numReqInLastForceWrite = 0; - } - } - } - numReqInLastForceWrite += req.process(shouldForceWrite); - - if (enableGroupForceWrites - // if its a marker we should switch back to flushing - && !req.isMarker - // If group marker sending failed, we can't figure out which writes are - // grouped in this force write. So, abandon it even if other writes could - // be grouped. This should be extremely rare as, usually, queue size is - // large enough to accommodate high flush frequencies. - && forceWriteMarkerSent - // This indicates that this is the last request in a given file - // so subsequent requests will go to a different file so we should - // flush on the next request - && !req.shouldClose) { - shouldForceWrite = false; - } else { - shouldForceWrite = true; - } - } catch (IOException ioe) { - LOG.error("I/O exception in ForceWrite thread", ioe); - running = false; - } finally { - if (req != null) { - req.recycle(); - } + lastRequest.logFile.forceWrite(false); + journalStats.getJournalSyncStats().registerSuccessfulEvent(MathUtils.elapsedNanos(fsyncStartTime), + TimeUnit.NANOSECONDS); + lastLogMark.setCurLogMark(lastRequest.logId, lastRequest.lastFlushedPosition); + } catch (IOException ioe) { + journalStats.getJournalSyncStats() + .registerFailedEvent(MathUtils.elapsedNanos(fsyncStartTime), TimeUnit.NANOSECONDS); + throw ioe; } } @@ -972,16 +886,12 @@ public void logAddEntry(long ledgerId, long entryId, ByteBuf entry, entry.retain(); journalStats.getJournalQueueSize().inc(); - journalStats.getJournalCbQueueSize().inc(); memoryLimitController.reserveMemory(entry.readableBytes()); queue.put(QueueEntry.create( entry, ackBeforeSync, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(), journalStats.getJournalAddEntryStats(), - journalStats.getJournalCbQueueSize(), - journalStats.getCbThreadPoolQueueSize(), - journalStats.getJournalCbQueuedLatency(), callbackTime)); } @@ -990,13 +900,9 @@ void forceLedger(long ledgerId, WriteCallback cb, Object ctx) { null, false /* ackBeforeSync */, ledgerId, BookieImpl.METAENTRY_ID_FORCE_LEDGER, cb, ctx, MathUtils.nowInNano(), journalStats.getJournalForceLedgerStats(), - journalStats.getJournalCbQueueSize(), - journalStats.getCbThreadPoolQueueSize(), - journalStats.getJournalCbQueuedLatency(), callbackTime)); // Increment afterwards because the add operation could fail. journalStats.getJournalQueueSize().inc(); - journalStats.getJournalCbQueueSize().inc(); } /** @@ -1185,8 +1091,6 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(), if (entry != null && (!syncData || entry.ackBeforeSync)) { toFlush.set(i, null); numEntriesToFlush--; - entry.setEnqueueCbThreadPooleQueueTime(MathUtils.nowInNano()); - journalStats.getCbThreadPoolQueueSize().inc(); entry.run(); } } @@ -1225,7 +1129,7 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(), || (System.currentTimeMillis() - lastFlushTimeMs >= journalPageCacheFlushIntervalMSec)) { forceWriteRequests.put(createForceWriteRequest(logFile, logId, lastFlushPosition, - toFlush, shouldRolloverJournal, false)); + toFlush, shouldRolloverJournal)); lastFlushTimeMs = System.currentTimeMillis(); } toFlush = entryListRecycler.newInstance(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java index ed2aab95246..2396acb1f76 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/JournalStats.java @@ -21,11 +21,8 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CB_THREAD_POOL_QUEUE_SIZE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_ADD_ENTRY; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CB_QUEUED_LATENCY; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CREATION_LATENCY; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FLUSH_LATENCY; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_LEDGER; @@ -33,7 +30,6 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_ENTRIES; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_ENQUEUE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_GROUPING_COUNT; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_GROUPING_FAILURES; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_MEMORY_MAX; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_MEMORY_USED; @@ -134,11 +130,6 @@ public class JournalStats { help = "The distribution of number of bytes grouped together into a force write request" ) private final OpStatsLogger forceWriteBatchBytesStats; - @StatsDoc( - name = JOURNAL_FORCE_WRITE_GROUPING_FAILURES, - help = "The number of force write grouping failures" - ) - private final Counter forceWriteGroupingFailures; @StatsDoc( name = JOURNAL_QUEUE_SIZE, help = "The journal queue size" @@ -149,23 +140,6 @@ public class JournalStats { help = "The force write queue size" ) private final Counter forceWriteQueueSize; - @StatsDoc( - name = JOURNAL_CB_QUEUE_SIZE, - help = "The journal callback queue size" - ) - private final Counter journalCbQueueSize; - - @StatsDoc( - name = CB_THREAD_POOL_QUEUE_SIZE, - help = "The queue size of cbThreadPool" - ) - private final Counter cbThreadPoolQueueSize; - - @StatsDoc( - name = JOURNAL_CB_QUEUED_LATENCY, - help = "The journal callback queued latency" - ) - private final OpStatsLogger journalCbQueuedLatency; @StatsDoc( name = JOURNAL_NUM_FLUSH_MAX_WAIT, @@ -210,15 +184,11 @@ public JournalStats(StatsLogger statsLogger, final long maxJournalMemoryBytes, journalProcessTimeStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_PROCESS_TIME_LATENCY); forceWriteGroupingCountStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_GROUPING_COUNT); - forceWriteGroupingFailures = statsLogger.getCounter(JOURNAL_FORCE_WRITE_GROUPING_FAILURES); forceWriteBatchEntriesStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_ENTRIES); forceWriteBatchBytesStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_BYTES); journalQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_QUEUE_SIZE); forceWriteQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE); - journalCbQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE); - cbThreadPoolQueueSize = statsLogger.getCounter(BookKeeperServerStats.CB_THREAD_POOL_QUEUE_SIZE); - journalCbQueuedLatency = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_CB_QUEUED_LATENCY); flushMaxWaitCounter = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_WAIT); flushMaxOutstandingBytesCounter = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java index 4e818389bc2..c5abdc521a5 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/OpStatTest.java @@ -22,7 +22,6 @@ package org.apache.bookkeeper.test; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE; -import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_CB_QUEUE_SIZE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_QUEUE_SIZE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_SCOPE; @@ -114,7 +113,6 @@ public void testTopLevelBookieWriteCounters() throws Exception { assertTrue(average <= elapsed); }); validateNonMonotonicCounterGauges(stats, new String[]{ - BOOKIE_SCOPE + "." + JOURNAL_SCOPE + ".journalIndex_0." + JOURNAL_CB_QUEUE_SIZE, BOOKIE_SCOPE + "." + JOURNAL_SCOPE + ".journalIndex_0." + JOURNAL_FORCE_WRITE_QUEUE_SIZE, BOOKIE_SCOPE + "." + JOURNAL_SCOPE + ".journalIndex_0." + JOURNAL_QUEUE_SIZE }, (value, max) -> {