Skip to content

Commit

Permalink
Simplified the logic for ForceWriteThread after we introduced queue.d…
Browse files Browse the repository at this point in the history
…rainTo() (#3830)

### Motivation

In #3545 we have switched the `ForceWriteThread` to take advantage o `BlockingQueue.drainTo()` method for reducing contention, though the core logic of the force-write was not touched at the time.

The logic of force-write is quite complicated because it tries to group multiple force-write requests in the queue by sending a new marker and grouping them when the marker is received. This also leads to a bit of lag when there are many requests coming in and the IO is stressed, as we're waiting a bit more before issuing the fsync.

Instead, with the `drainTo()` approach we can greatly simplify the logic and maintain a strict fsync grouping: 
 1. drain all the force-write-requests available in the queue into a local array list
 2. perform the fsync
 3. update the journal log mark to the position of the last fw request
 4. trigger send-responses for all the requests
 5. go back to read from the queue


This refactoring will also enable further improvements, to optimize how the send responses are prepared, since we have now a list of responses ready to send.
  • Loading branch information
merlimat authored Mar 6, 2023
1 parent dfde3d6 commit 128c52e
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ public interface BookKeeperServerStats {
String JOURNAL_FORCE_WRITE_ENQUEUE = "JOURNAL_FORCE_WRITE_ENQUEUE";
String JOURNAL_FORCE_WRITE_BATCH_ENTRIES = "JOURNAL_FORCE_WRITE_BATCH_ENTRIES";
String JOURNAL_FORCE_WRITE_BATCH_BYTES = "JOURNAL_FORCE_WRITE_BATCH_BYTES";
String JOURNAL_FORCE_WRITE_GROUPING_FAILURES = "JOURNAL_FORCE_WRITE_GROUPING_FAILURES";
String JOURNAL_FLUSH_LATENCY = "JOURNAL_FLUSH_LATENCY";
String JOURNAL_QUEUE_LATENCY = "JOURNAL_QUEUE_LATENCY";
String JOURNAL_QUEUE_MAX_SIZE = "JOURNAL_QUEUE_MAX_SIZE";
Expand Down Expand Up @@ -158,9 +157,6 @@ public interface BookKeeperServerStats {
String INDEX_INMEM_ILLEGAL_STATE_RESET = "INDEX_INMEM_ILLEGAL_STATE_RESET";
String INDEX_INMEM_ILLEGAL_STATE_DELETE = "INDEX_INMEM_ILLEGAL_STATE_DELETE";
String JOURNAL_FORCE_WRITE_QUEUE_SIZE = "JOURNAL_FORCE_WRITE_QUEUE_SIZE";
String JOURNAL_CB_QUEUE_SIZE = "JOURNAL_CB_QUEUE_SIZE";
String CB_THREAD_POOL_QUEUE_SIZE = "CB_THREAD_POOL_QUEUE_SIZE";
String JOURNAL_CB_QUEUED_LATENCY = "JOURNAL_CB_QUEUED_LATENCY";
String JOURNAL_NUM_FLUSH_EMPTY_QUEUE = "JOURNAL_NUM_FLUSH_EMPTY_QUEUE";
String JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES = "JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES";
String JOURNAL_NUM_FLUSH_MAX_WAIT = "JOURNAL_NUM_FLUSH_MAX_WAIT";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,19 +299,14 @@ static class QueueEntry implements Runnable {
WriteCallback cb;
Object ctx;
long enqueueTime;
long enqueueCbThreadPooleQueueTime;
boolean ackBeforeSync;

OpStatsLogger journalAddEntryStats;
OpStatsLogger journalCbQueuedLatency;
Counter journalCbQueueSize;
Counter cbThreadPoolQueueSize;
Counter callbackTime;

static QueueEntry create(ByteBuf entry, boolean ackBeforeSync, long ledgerId, long entryId,
WriteCallback cb, Object ctx, long enqueueTime, OpStatsLogger journalAddEntryStats,
Counter journalCbQueueSize, Counter cbThreadPoolQueueSize,
OpStatsLogger journalCbQueuedLatency, Counter callbackTime) {
Counter callbackTime) {
QueueEntry qe = RECYCLER.get();
qe.entry = entry;
qe.ackBeforeSync = ackBeforeSync;
Expand All @@ -321,27 +316,16 @@ static QueueEntry create(ByteBuf entry, boolean ackBeforeSync, long ledgerId, lo
qe.entryId = entryId;
qe.enqueueTime = enqueueTime;
qe.journalAddEntryStats = journalAddEntryStats;
qe.journalCbQueuedLatency = journalCbQueuedLatency;
qe.journalCbQueueSize = journalCbQueueSize;
qe.cbThreadPoolQueueSize = cbThreadPoolQueueSize;
qe.callbackTime = callbackTime;
return qe;
}

public void setEnqueueCbThreadPooleQueueTime(long enqueueCbThreadPooleQueueTime) {
this.enqueueCbThreadPooleQueueTime = enqueueCbThreadPooleQueueTime;
}

@Override
public void run() {
journalCbQueuedLatency.registerSuccessfulEvent(
MathUtils.elapsedNanos(enqueueCbThreadPooleQueueTime), TimeUnit.NANOSECONDS);
long startTime = System.nanoTime();
if (LOG.isDebugEnabled()) {
LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId);
}
journalCbQueueSize.dec();
cbThreadPoolQueueSize.dec();
journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS);
cb.writeComplete(0, ledgerId, entryId, null, ctx);
callbackTime.addLatency(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
Expand All @@ -366,9 +350,6 @@ private void recycle() {
this.cb = null;
this.ctx = null;
this.journalAddEntryStats = null;
this.journalCbQueuedLatency = null;
this.journalCbQueueSize = null;
this.cbThreadPoolQueueSize = null;
this.callbackTime = null;
recyclerHandle.recycle(this);
}
Expand All @@ -378,51 +359,25 @@ private void recycle() {
* Token which represents the need to force a write to the Journal.
*/
@VisibleForTesting
public class ForceWriteRequest {
public static class ForceWriteRequest {
private JournalChannel logFile;
private RecyclableArrayList<QueueEntry> forceWriteWaiters;
private boolean shouldClose;
private boolean isMarker;
private long lastFlushedPosition;
private long logId;
private long enqueueTime;

public int process(boolean shouldForceWrite) throws IOException {
journalStats.getForceWriteQueueSize().dec();
journalStats.getFwEnqueueTimeStats()
.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime), TimeUnit.NANOSECONDS);

if (isMarker) {
return 0;
}
public int process() {
closeFileIfNecessary();

long startTime = MathUtils.nowInNano();
try {
if (shouldForceWrite) {
this.logFile.forceWrite(false);
journalStats.getJournalSyncStats()
.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
lastLogMark.setCurLogMark(this.logId, this.lastFlushedPosition);

// Notify the waiters that the force write succeeded
for (int i = 0; i < forceWriteWaiters.size(); i++) {
QueueEntry qe = forceWriteWaiters.get(i);
if (qe != null) {
qe.setEnqueueCbThreadPooleQueueTime(MathUtils.nowInNano());
journalStats.getCbThreadPoolQueueSize().inc();
qe.run();
}
// Notify the waiters that the force write succeeded
for (int i = 0; i < forceWriteWaiters.size(); i++) {
QueueEntry qe = forceWriteWaiters.get(i);
if (qe != null) {
qe.run();
}

return forceWriteWaiters.size();
} catch (IOException e) {
journalStats.getJournalSyncStats()
.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
throw e;
} finally {
closeFileIfNecessary();
}

return forceWriteWaiters.size();
}

public void closeFileIfNecessary() {
Expand All @@ -431,6 +386,7 @@ public void closeFileIfNecessary() {
// We should guard against exceptions so its
// safe to call in catch blocks
try {
logFile.forceWrite(false);
logFile.close();
// Call close only once
shouldClose = false;
Expand Down Expand Up @@ -460,21 +416,18 @@ private ForceWriteRequest createForceWriteRequest(JournalChannel logFile,
long logId,
long lastFlushedPosition,
RecyclableArrayList<QueueEntry> forceWriteWaiters,
boolean shouldClose,
boolean isMarker) {
boolean shouldClose) {
ForceWriteRequest req = forceWriteRequestsRecycler.get();
req.forceWriteWaiters = forceWriteWaiters;
req.logFile = logFile;
req.logId = logId;
req.lastFlushedPosition = lastFlushedPosition;
req.shouldClose = shouldClose;
req.isMarker = isMarker;
req.enqueueTime = MathUtils.nowInNano();
journalStats.getForceWriteQueueSize().inc();
return req;
}

private final Recycler<ForceWriteRequest> forceWriteRequestsRecycler = new Recycler<ForceWriteRequest>() {
private static final Recycler<ForceWriteRequest> forceWriteRequestsRecycler = new Recycler<ForceWriteRequest>() {
@Override
protected ForceWriteRequest newObject(
Recycler.Handle<ForceWriteRequest> handle) {
Expand All @@ -495,10 +448,6 @@ private class ForceWriteThread extends BookieCriticalThread {
private final boolean enableGroupForceWrites;
private final Counter forceWriteThreadTime;

boolean shouldForceWrite = true;
int numReqInLastForceWrite = 0;
boolean forceWriteMarkerSent = false;

public ForceWriteThread(Thread threadToNotifyOnEx,
boolean enableGroupForceWrites,
StatsLogger statsLogger) {
Expand All @@ -520,12 +469,11 @@ public void run() {
}
}

long busyStartTime = System.nanoTime();

List<ForceWriteRequest> localRequests = new ArrayList<>();
final List<ForceWriteRequest> localRequests = new ArrayList<>();

while (running) {
try {
int numReqInLastForceWrite = 0;

int requestsCount = forceWriteRequests.drainTo(localRequests);
if (requestsCount == 0) {
Expand All @@ -534,11 +482,26 @@ public void run() {
requestsCount = 1;
}

journalStats.getForceWriteQueueSize().addCount(-requestsCount);

// Sync and mark the journal up to the position of the last entry in the batch
ForceWriteRequest lastRequest = localRequests.get(requestsCount - 1);
syncJournal(lastRequest);

// All the requests in the batch are now fully-synced. We can trigger sending the
// responses
for (int i = 0; i < requestsCount; i++) {
forceWriteThreadTime.addLatency(MathUtils.elapsedNanos(busyStartTime), TimeUnit.NANOSECONDS);
processForceWriteRequest(localRequests.get(i));
busyStartTime = System.nanoTime();
ForceWriteRequest req = localRequests.get(i);
numReqInLastForceWrite += req.process();
req.recycle();
}

journalStats.getForceWriteGroupingCountStats()
.registerSuccessfulValue(numReqInLastForceWrite);

} catch (IOException ioe) {
LOG.error("I/O exception in ForceWrite thread", ioe);
running = false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info("ForceWrite thread interrupted");
Expand All @@ -559,66 +522,17 @@ public void run() {
threadToNotifyOnEx.interrupt();
}

private void processForceWriteRequest(ForceWriteRequest req) {
private void syncJournal(ForceWriteRequest lastRequest) throws IOException {
long fsyncStartTime = MathUtils.nowInNano();
try {
// Force write the file and then notify the write completions
//
if (!req.isMarker) {
if (shouldForceWrite) {
// if we are going to force write, any request that is already in the
// queue will benefit from this force write - post a marker prior to issuing
// the flush so until this marker is encountered we can skip the force write
if (enableGroupForceWrites) {
ForceWriteRequest marker =
createForceWriteRequest(req.logFile, 0, 0, null, false, true);
forceWriteMarkerSent = forceWriteRequests.offer(marker);
if (!forceWriteMarkerSent) {
marker.recycle();
Counter failures = journalStats.getForceWriteGroupingFailures();
failures.inc();
LOG.error(
"Fail to send force write grouping marker,"
+ " Journal.forceWriteRequests queue(capacity {}) is full,"
+ " current failure counter is {}.",
conf.getJournalQueueSize(), failures.get());
}
}

// If we are about to issue a write, record the number of requests in
// the last force write and then reset the counter so we can accumulate
// requests in the write we are about to issue
if (numReqInLastForceWrite > 0) {
journalStats.getForceWriteGroupingCountStats()
.registerSuccessfulValue(numReqInLastForceWrite);
numReqInLastForceWrite = 0;
}
}
}
numReqInLastForceWrite += req.process(shouldForceWrite);

if (enableGroupForceWrites
// if its a marker we should switch back to flushing
&& !req.isMarker
// If group marker sending failed, we can't figure out which writes are
// grouped in this force write. So, abandon it even if other writes could
// be grouped. This should be extremely rare as, usually, queue size is
// large enough to accommodate high flush frequencies.
&& forceWriteMarkerSent
// This indicates that this is the last request in a given file
// so subsequent requests will go to a different file so we should
// flush on the next request
&& !req.shouldClose) {
shouldForceWrite = false;
} else {
shouldForceWrite = true;
}
} catch (IOException ioe) {
LOG.error("I/O exception in ForceWrite thread", ioe);
running = false;
} finally {
if (req != null) {
req.recycle();
}
lastRequest.logFile.forceWrite(false);
journalStats.getJournalSyncStats().registerSuccessfulEvent(MathUtils.elapsedNanos(fsyncStartTime),
TimeUnit.NANOSECONDS);
lastLogMark.setCurLogMark(lastRequest.logId, lastRequest.lastFlushedPosition);
} catch (IOException ioe) {
journalStats.getJournalSyncStats()
.registerFailedEvent(MathUtils.elapsedNanos(fsyncStartTime), TimeUnit.NANOSECONDS);
throw ioe;
}
}

Expand Down Expand Up @@ -972,16 +886,12 @@ public void logAddEntry(long ledgerId, long entryId, ByteBuf entry,
entry.retain();

journalStats.getJournalQueueSize().inc();
journalStats.getJournalCbQueueSize().inc();

memoryLimitController.reserveMemory(entry.readableBytes());

queue.put(QueueEntry.create(
entry, ackBeforeSync, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
journalStats.getJournalAddEntryStats(),
journalStats.getJournalCbQueueSize(),
journalStats.getCbThreadPoolQueueSize(),
journalStats.getJournalCbQueuedLatency(),
callbackTime));
}

Expand All @@ -990,13 +900,9 @@ void forceLedger(long ledgerId, WriteCallback cb, Object ctx) {
null, false /* ackBeforeSync */, ledgerId,
BookieImpl.METAENTRY_ID_FORCE_LEDGER, cb, ctx, MathUtils.nowInNano(),
journalStats.getJournalForceLedgerStats(),
journalStats.getJournalCbQueueSize(),
journalStats.getCbThreadPoolQueueSize(),
journalStats.getJournalCbQueuedLatency(),
callbackTime));
// Increment afterwards because the add operation could fail.
journalStats.getJournalQueueSize().inc();
journalStats.getJournalCbQueueSize().inc();
}

/**
Expand Down Expand Up @@ -1185,8 +1091,6 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(),
if (entry != null && (!syncData || entry.ackBeforeSync)) {
toFlush.set(i, null);
numEntriesToFlush--;
entry.setEnqueueCbThreadPooleQueueTime(MathUtils.nowInNano());
journalStats.getCbThreadPoolQueueSize().inc();
entry.run();
}
}
Expand Down Expand Up @@ -1225,7 +1129,7 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(),
|| (System.currentTimeMillis() - lastFlushTimeMs
>= journalPageCacheFlushIntervalMSec)) {
forceWriteRequests.put(createForceWriteRequest(logFile, logId, lastFlushPosition,
toFlush, shouldRolloverJournal, false));
toFlush, shouldRolloverJournal));
lastFlushTimeMs = System.currentTimeMillis();
}
toFlush = entryListRecycler.newInstance();
Expand Down
Loading

0 comments on commit 128c52e

Please sign in to comment.