Skip to content

Commit

Permalink
Group and flush add-responses after journal sync (apache#3837)
Browse files Browse the repository at this point in the history
### Motivation

Note: this is stacked on top of apache#3830 & apache#3835

This change improves the way the AddRequests responses are send to client. 

The current flow is: 
 * The journal-force-thread issues the fsync on the journal file
 * We iterate over all the entries that were just synced and for each of them:
     1. Trigger channel.writeAndFlus()
     2. This will jump on the connection IO thread (Netty will use a `write()` to `eventfd` to post the task and wake the epoll)
     3. Write the object in the connection and trigger the serialization logic
     4. Grab a `ByteBuf` from the pool and write ~20 bytes with the response
     5. Write and flush the buffer on the channel
     6. With the flush consolidator we try to group multiple buffer into a single `writev()` syscall, though each call will have a long list of buffer, making the memcpy inefficient.
     7. Release all the buffers and return them to the pool

All these steps are quite expensive when the bookie is receiving a lot of small requests. 

This PR changes the flow into: 

1. journal fsync
2. go through each request and prepare the response into a per-connection `ByteBuf` which is not written on the channel as of yet
3. after preparing all the responses, we flush them at once: Trigger an event on all the connections that will write the accumulated buffers.

The advantages are: 
 1. 1 ByteBuf allocated per connection instead of 1 per request
    1. Less allocations and stress of buffer pool
    2. More efficient socket write() operations
 3. 1 task per connection posted on the Netty IO threads, instead of 1 per request.
  • Loading branch information
merlimat authored and Anup Ghatage committed Jul 12, 2024
1 parent 5c89db1 commit 3d76d4a
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.PrimitiveIterator;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;

/**
Expand Down Expand Up @@ -86,6 +87,8 @@ void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
// TODO: Should be constructed and passed in as a parameter
LedgerStorage getLedgerStorage();

void setRequestProcessor(RequestProcessor requestProcessor);

// TODO: Move this exceptions somewhere else
/**
* Exception is thrown when no such a ledger is found in this bookie.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNS;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
Expand Down Expand Up @@ -1281,4 +1282,11 @@ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedg
}
}
}

@Override
public void setRequestProcessor(RequestProcessor requestProcessor) {
for (Journal journal : journals) {
journal.setRequestProcessor(requestProcessor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.bookkeeper.common.util.MemoryLimitController;
import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
Expand Down Expand Up @@ -444,6 +445,8 @@ private class ForceWriteThread extends BookieCriticalThread {
// This holds the queue entries that should be notified after a
// successful force write
Thread threadToNotifyOnEx;

RequestProcessor requestProcessor;
// should we group force writes
private final boolean enableGroupForceWrites;
private final Counter forceWriteThreadTime;
Expand Down Expand Up @@ -499,6 +502,10 @@ public void run() {
journalStats.getForceWriteGroupingCountStats()
.registerSuccessfulValue(numReqInLastForceWrite);

if (requestProcessor != null) {
requestProcessor.flushPendingResponses();
}

} catch (IOException ioe) {
LOG.error("I/O exception in ForceWrite thread", ioe);
running = false;
Expand Down Expand Up @@ -1093,6 +1100,10 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(),
numEntriesToFlush--;
entry.run();
}

if (forceWriteThread.requestProcessor != null) {
forceWriteThread.requestProcessor.flushPendingResponses();
}
}

lastFlushPosition = bc.position();
Expand Down Expand Up @@ -1211,6 +1222,10 @@ public BufferedChannelBuilder getBufferedChannelBuilder() {
return (FileChannel fc, int capacity) -> new BufferedChannel(allocator, fc, capacity);
}

public void setRequestProcessor(RequestProcessor requestProcessor) {
forceWriteThread.requestProcessor = requestProcessor;
}

/**
* Shuts down the journal.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,9 @@ public interface RequestProcessor extends AutoCloseable {
* channel received the given request <i>r</i>
*/
void processRequest(Object r, BookieRequestHandler channel);

/**
* Flush any pending response staged on all the client connections.
*/
void flushPendingResponses();
}
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,14 @@ public Object decode(ByteBuf buffer)
throw new IllegalStateException("Received unknown response : op code = " + opCode);
}
}

public static void serializeAddResponseInto(int rc, BookieProtocol.ParsedAddRequest req, ByteBuf buf) {
buf.writeInt(RESPONSE_HEADERS_SIZE); // Frame size
buf.writeInt(PacketHeader.toInt(req.getProtocolVersion(), req.getOpCode(), (short) 0));
buf.writeInt(rc); // rc-code
buf.writeLong(req.getLedgerId());
buf.writeLong(req.getEntryId());
}
}

/**
Expand Down Expand Up @@ -504,7 +512,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
if (LOG.isTraceEnabled()) {
LOG.trace("Encode response {} to channel {}.", msg, ctx.channel());
}
if (msg instanceof BookkeeperProtocol.Response) {

if (msg instanceof ByteBuf) {
ctx.write(msg, promise);
} else if (msg instanceof BookkeeperProtocol.Response) {
ctx.write(repV3.encode(msg, ctx.alloc()), promise);
} else if (msg instanceof BookieProtocol.Response) {
ctx.write(repPreV3.encode(msg, ctx.alloc()), promise);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,31 @@
*/
package org.apache.bookkeeper.proto;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import java.nio.channels.ClosedChannelException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Serverside handler for bookkeeper requests.
*/
@Slf4j
public class BookieRequestHandler extends ChannelInboundHandlerAdapter {

private static final Logger LOG = LoggerFactory.getLogger(BookieRequestHandler.class);
static final Object EVENT_FLUSH_ALL_PENDING_RESPONSES = new Object();

private final RequestProcessor requestProcessor;
private final ChannelGroup allChannels;

private ChannelHandlerContext ctx;

private ByteBuf pendingSendResponses = null;
private int maxPendingResponsesSize;

BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) {
this.requestProcessor = processor;
this.allChannels = allChannels;
Expand All @@ -51,7 +56,7 @@ public ChannelHandlerContext ctx() {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LOG.info("Channel connected {}", ctx.channel());
log.info("Channel connected {}", ctx.channel());
this.ctx = ctx;
super.channelActive(ctx);
}
Expand All @@ -63,16 +68,16 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOG.info("Channels disconnected: {}", ctx.channel());
log.info("Channels disconnected: {}", ctx.channel());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof ClosedChannelException) {
LOG.info("Client died before request could be completed on {}", ctx.channel(), cause);
log.info("Client died before request could be completed on {}", ctx.channel(), cause);
return;
}
LOG.error("Unhandled exception occurred in I/O thread or handler on {}", ctx.channel(), cause);
log.error("Unhandled exception occurred in I/O thread or handler on {}", ctx.channel(), cause);
ctx.close();
}

Expand All @@ -84,4 +89,34 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
requestProcessor.processRequest(msg, this);
}

public synchronized void prepareSendResponseV2(int rc, BookieProtocol.ParsedAddRequest req) {
if (pendingSendResponses == null) {
pendingSendResponses = ctx.alloc().directBuffer(maxPendingResponsesSize != 0
? maxPendingResponsesSize : 256);
}

BookieProtoEncoding.ResponseEnDeCoderPreV3.serializeAddResponseInto(rc, req, pendingSendResponses);
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == EVENT_FLUSH_ALL_PENDING_RESPONSES) {
synchronized (this) {
if (pendingSendResponses != null) {
maxPendingResponsesSize = Math.max(maxPendingResponsesSize,
pendingSendResponses.readableBytes());
if (ctx.channel().isActive()) {
ctx.writeAndFlush(pendingSendResponses, ctx.voidPromise());
} else {
pendingSendResponses.release();
}

pendingSendResponses = null;
}
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.Future;
Expand Down Expand Up @@ -118,6 +119,8 @@ public class BookieRequestProcessor implements RequestProcessor {
final Semaphore addsSemaphore;
final Semaphore readsSemaphore;

final ChannelGroup allChannels;

// to temporary blacklist channels
final Optional<Cache<Channel, Boolean>> blacklistedChannels;
final Consumer<Channel> onResponseTimeout;
Expand All @@ -127,9 +130,11 @@ public class BookieRequestProcessor implements RequestProcessor {
private final boolean throttleReadResponses;

public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, StatsLogger statsLogger,
SecurityHandlerFactory shFactory, ByteBufAllocator allocator) throws SecurityException {
SecurityHandlerFactory shFactory, ByteBufAllocator allocator,
ChannelGroup allChannels) throws SecurityException {
this.serverCfg = serverCfg;
this.allocator = allocator;
this.allChannels = allChannels;
this.waitTimeoutOnBackpressureMillis = serverCfg.getWaitTimeoutOnResponseBackpressureMillis();
this.preserveMdcForTaskExecution = serverCfg.getPreserveMdcForTaskExecution();
this.bookie = bookie;
Expand Down Expand Up @@ -694,6 +699,13 @@ private void processReadRequest(final BookieProtocol.ReadRequest r, final Bookie
}
}

@Override
public void flushPendingResponses() {
for (Channel c : allChannels) {
c.pipeline().fireUserEventTriggered(BookieRequestHandler.EVENT_FLUSH_ALL_PENDING_RESPONSES);
}
}

public long getWaitTimeoutOnBackpressureMillis() {
return waitTimeoutOnBackpressureMillis;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,11 @@ public BookieServer(ServerConfiguration conf,

shFactory = SecurityProviderFactoryFactory
.getSecurityProviderFactory(conf.getTLSProviderFactoryClass());

this.requestProcessor = new BookieRequestProcessor(conf, bookie,
statsLogger.scope(SERVER_SCOPE), shFactory, allocator);
statsLogger.scope(SERVER_SCOPE), shFactory, allocator, nettyServer.allChannels);
this.nettyServer.setRequestProcessor(this.requestProcessor);
this.bookie.setRequestProcessor(this.requestProcessor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,10 @@ public void writeComplete(int rc, long ledgerId, long entryId,
requestProcessor.getRequestStats().getAddEntryStats()
.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
}
sendWriteReqResponse(rc,
ResponseBuilder.buildAddResponse(request),
requestProcessor.getRequestStats().getAddRequestStats());

requestHandler.prepareSendResponseV2(rc, request);
requestProcessor.onAddRequestFinish();

request.recycle();
recycle();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
Expand All @@ -53,20 +55,24 @@ public class TestBookieRequestProcessor {

final BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class);

private final ChannelGroup channelGroup = new DefaultChannelGroup(null);

@Test
public void testConstructLongPollThreads() throws Exception {
// long poll threads == read threads
ServerConfiguration conf = new ServerConfiguration();
try (BookieRequestProcessor processor = new BookieRequestProcessor(
conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT,
channelGroup)) {
assertSame(processor.getReadThreadPool(), processor.getLongPollThreadPool());
}

// force create long poll threads if there is no read threads
conf = new ServerConfiguration();
conf.setNumReadWorkerThreads(0);
try (BookieRequestProcessor processor = new BookieRequestProcessor(
conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT,
channelGroup)) {
assertNull(processor.getReadThreadPool());
assertNotNull(processor.getLongPollThreadPool());
}
Expand All @@ -76,7 +82,8 @@ conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocat
conf.setNumReadWorkerThreads(2);
conf.setNumLongPollWorkerThreads(2);
try (BookieRequestProcessor processor = new BookieRequestProcessor(
conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT,
channelGroup)) {
assertNotNull(processor.getReadThreadPool());
assertNotNull(processor.getLongPollThreadPool());
assertNotSame(processor.getReadThreadPool(), processor.getLongPollThreadPool());
Expand Down
Loading

0 comments on commit 3d76d4a

Please sign in to comment.