diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 90c8acf5af4..ac9df53cd22 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -23,6 +23,7 @@
import java.util.PrimitiveIterator;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.util.Watcher;
+import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
/**
@@ -86,6 +87,8 @@ void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
// TODO: Should be constructed and passed in as a parameter
LedgerStorage getLedgerStorage();
+ void setRequestProcessor(RequestProcessor requestProcessor);
+
// TODO: Move this exceptions somewhere else
/**
* Exception is thrown when no such a ledger is found in this bookie.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index 0db230d9d3d..2b76488cbe9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -69,6 +69,7 @@
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNS;
+import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -1281,4 +1282,11 @@ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedg
}
}
}
+
+ @Override
+ public void setRequestProcessor(RequestProcessor requestProcessor) {
+ for (Journal journal : journals) {
+ journal.setRequestProcessor(requestProcessor);
+ }
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index eff1673edb0..5f6b60f799b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -51,6 +51,7 @@
import org.apache.bookkeeper.common.util.MemoryLimitController;
import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -444,6 +445,8 @@ private class ForceWriteThread extends BookieCriticalThread {
// This holds the queue entries that should be notified after a
// successful force write
Thread threadToNotifyOnEx;
+
+ RequestProcessor requestProcessor;
// should we group force writes
private final boolean enableGroupForceWrites;
private final Counter forceWriteThreadTime;
@@ -499,6 +502,10 @@ public void run() {
journalStats.getForceWriteGroupingCountStats()
.registerSuccessfulValue(numReqInLastForceWrite);
+ if (requestProcessor != null) {
+ requestProcessor.flushPendingResponses();
+ }
+
} catch (IOException ioe) {
LOG.error("I/O exception in ForceWrite thread", ioe);
running = false;
@@ -1093,6 +1100,10 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(),
numEntriesToFlush--;
entry.run();
}
+
+ if (forceWriteThread.requestProcessor != null) {
+ forceWriteThread.requestProcessor.flushPendingResponses();
+ }
}
lastFlushPosition = bc.position();
@@ -1211,6 +1222,10 @@ public BufferedChannelBuilder getBufferedChannelBuilder() {
return (FileChannel fc, int capacity) -> new BufferedChannel(allocator, fc, capacity);
}
+ public void setRequestProcessor(RequestProcessor requestProcessor) {
+ forceWriteThread.requestProcessor = requestProcessor;
+ }
+
/**
* Shuts down the journal.
*/
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
index 5a4238e64d6..9f9a0daf682 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
@@ -42,4 +42,9 @@ public interface RequestProcessor extends AutoCloseable {
* channel received the given request r
*/
void processRequest(Object r, BookieRequestHandler channel);
+
+ /**
+ * Flush any pending response staged on all the client connections.
+ */
+ void flushPendingResponses();
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 3e41a3f5ea7..edbffa5f431 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -334,6 +334,14 @@ public Object decode(ByteBuf buffer)
throw new IllegalStateException("Received unknown response : op code = " + opCode);
}
}
+
+ public static void serializeAddResponseInto(int rc, BookieProtocol.ParsedAddRequest req, ByteBuf buf) {
+ buf.writeInt(RESPONSE_HEADERS_SIZE); // Frame size
+ buf.writeInt(PacketHeader.toInt(req.getProtocolVersion(), req.getOpCode(), (short) 0));
+ buf.writeInt(rc); // rc-code
+ buf.writeLong(req.getLedgerId());
+ buf.writeLong(req.getEntryId());
+ }
}
/**
@@ -504,7 +512,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
if (LOG.isTraceEnabled()) {
LOG.trace("Encode response {} to channel {}.", msg, ctx.channel());
}
- if (msg instanceof BookkeeperProtocol.Response) {
+
+ if (msg instanceof ByteBuf) {
+ ctx.write(msg, promise);
+ } else if (msg instanceof BookkeeperProtocol.Response) {
ctx.write(repV3.encode(msg, ctx.alloc()), promise);
} else if (msg instanceof BookieProtocol.Response) {
ctx.write(repPreV3.encode(msg, ctx.alloc()), promise);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
index c9d65a73174..50b7969023e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
@@ -20,26 +20,31 @@
*/
package org.apache.bookkeeper.proto;
+import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import java.nio.channels.ClosedChannelException;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.processor.RequestProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Serverside handler for bookkeeper requests.
*/
+@Slf4j
public class BookieRequestHandler extends ChannelInboundHandlerAdapter {
- private static final Logger LOG = LoggerFactory.getLogger(BookieRequestHandler.class);
+ static final Object EVENT_FLUSH_ALL_PENDING_RESPONSES = new Object();
+
private final RequestProcessor requestProcessor;
private final ChannelGroup allChannels;
private ChannelHandlerContext ctx;
+ private ByteBuf pendingSendResponses = null;
+ private int maxPendingResponsesSize;
+
BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) {
this.requestProcessor = processor;
this.allChannels = allChannels;
@@ -51,7 +56,7 @@ public ChannelHandlerContext ctx() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
- LOG.info("Channel connected {}", ctx.channel());
+ log.info("Channel connected {}", ctx.channel());
this.ctx = ctx;
super.channelActive(ctx);
}
@@ -63,16 +68,16 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- LOG.info("Channels disconnected: {}", ctx.channel());
+ log.info("Channels disconnected: {}", ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof ClosedChannelException) {
- LOG.info("Client died before request could be completed on {}", ctx.channel(), cause);
+ log.info("Client died before request could be completed on {}", ctx.channel(), cause);
return;
}
- LOG.error("Unhandled exception occurred in I/O thread or handler on {}", ctx.channel(), cause);
+ log.error("Unhandled exception occurred in I/O thread or handler on {}", ctx.channel(), cause);
ctx.close();
}
@@ -84,4 +89,34 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
requestProcessor.processRequest(msg, this);
}
+
+ public synchronized void prepareSendResponseV2(int rc, BookieProtocol.ParsedAddRequest req) {
+ if (pendingSendResponses == null) {
+ pendingSendResponses = ctx.alloc().directBuffer(maxPendingResponsesSize != 0
+ ? maxPendingResponsesSize : 256);
+ }
+
+ BookieProtoEncoding.ResponseEnDeCoderPreV3.serializeAddResponseInto(rc, req, pendingSendResponses);
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt == EVENT_FLUSH_ALL_PENDING_RESPONSES) {
+ synchronized (this) {
+ if (pendingSendResponses != null) {
+ maxPendingResponsesSize = Math.max(maxPendingResponsesSize,
+ pendingSendResponses.readableBytes());
+ if (ctx.channel().isActive()) {
+ ctx.writeAndFlush(pendingSendResponses, ctx.voidPromise());
+ } else {
+ pendingSendResponses.release();
+ }
+
+ pendingSendResponses = null;
+ }
+ }
+ } else {
+ super.userEventTriggered(ctx, evt);
+ }
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 9237c451ed6..d07aa9cffa0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -30,6 +30,7 @@
import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
+import io.netty.channel.group.ChannelGroup;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.Future;
@@ -118,6 +119,8 @@ public class BookieRequestProcessor implements RequestProcessor {
final Semaphore addsSemaphore;
final Semaphore readsSemaphore;
+ final ChannelGroup allChannels;
+
// to temporary blacklist channels
final Optional> blacklistedChannels;
final Consumer onResponseTimeout;
@@ -127,9 +130,11 @@ public class BookieRequestProcessor implements RequestProcessor {
private final boolean throttleReadResponses;
public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, StatsLogger statsLogger,
- SecurityHandlerFactory shFactory, ByteBufAllocator allocator) throws SecurityException {
+ SecurityHandlerFactory shFactory, ByteBufAllocator allocator,
+ ChannelGroup allChannels) throws SecurityException {
this.serverCfg = serverCfg;
this.allocator = allocator;
+ this.allChannels = allChannels;
this.waitTimeoutOnBackpressureMillis = serverCfg.getWaitTimeoutOnResponseBackpressureMillis();
this.preserveMdcForTaskExecution = serverCfg.getPreserveMdcForTaskExecution();
this.bookie = bookie;
@@ -694,6 +699,13 @@ private void processReadRequest(final BookieProtocol.ReadRequest r, final Bookie
}
}
+ @Override
+ public void flushPendingResponses() {
+ for (Channel c : allChannels) {
+ c.pipeline().fireUserEventTriggered(BookieRequestHandler.EVENT_FLUSH_ALL_PENDING_RESPONSES);
+ }
+ }
+
public long getWaitTimeoutOnBackpressureMillis() {
return waitTimeoutOnBackpressureMillis;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index 64f439b2134..caff467db36 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -102,9 +102,11 @@ public BookieServer(ServerConfiguration conf,
shFactory = SecurityProviderFactoryFactory
.getSecurityProviderFactory(conf.getTLSProviderFactoryClass());
+
this.requestProcessor = new BookieRequestProcessor(conf, bookie,
- statsLogger.scope(SERVER_SCOPE), shFactory, allocator);
+ statsLogger.scope(SERVER_SCOPE), shFactory, allocator, nettyServer.allChannels);
this.nettyServer.setRequestProcessor(this.requestProcessor);
+ this.bookie.setRequestProcessor(this.requestProcessor);
}
/**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 7e8f9fa768d..29b3a5abb70 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -122,9 +122,10 @@ public void writeComplete(int rc, long ledgerId, long entryId,
requestProcessor.getRequestStats().getAddEntryStats()
.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
}
- sendWriteReqResponse(rc,
- ResponseBuilder.buildAddResponse(request),
- requestProcessor.getRequestStats().getAddRequestStats());
+
+ requestHandler.prepareSendResponseV2(rc, request);
+ requestProcessor.onAddRequestFinish();
+
request.recycle();
recycle();
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
index d5ee8f52750..46304023433 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
@@ -33,6 +33,8 @@
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
@@ -53,12 +55,15 @@ public class TestBookieRequestProcessor {
final BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class);
+ private final ChannelGroup channelGroup = new DefaultChannelGroup(null);
+
@Test
public void testConstructLongPollThreads() throws Exception {
// long poll threads == read threads
ServerConfiguration conf = new ServerConfiguration();
try (BookieRequestProcessor processor = new BookieRequestProcessor(
- conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
+ conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT,
+ channelGroup)) {
assertSame(processor.getReadThreadPool(), processor.getLongPollThreadPool());
}
@@ -66,7 +71,8 @@ conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocat
conf = new ServerConfiguration();
conf.setNumReadWorkerThreads(0);
try (BookieRequestProcessor processor = new BookieRequestProcessor(
- conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
+ conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT,
+ channelGroup)) {
assertNull(processor.getReadThreadPool());
assertNotNull(processor.getLongPollThreadPool());
}
@@ -76,7 +82,8 @@ conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocat
conf.setNumReadWorkerThreads(2);
conf.setNumLongPollWorkerThreads(2);
try (BookieRequestProcessor processor = new BookieRequestProcessor(
- conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
+ conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT,
+ channelGroup)) {
assertNotNull(processor.getReadThreadPool());
assertNotNull(processor.getLongPollThreadPool());
assertNotSame(processor.getReadThreadPool(), processor.getLongPollThreadPool());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
index 8fc3a89f004..a02cde4ab99 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
@@ -22,6 +22,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doAnswer;
@@ -181,28 +182,25 @@ public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesAllowed(
return null;
}).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0]));
- AtomicReference