Skip to content

Commit

Permalink
Pass BookieRequestHandler instead of Channel to the request processors (
Browse files Browse the repository at this point in the history
#3835)

* Pass BookieRequestHandler instead of Channel to the request processors

* Fixed checkstyle

* Fixed ForceLedgerProcessorV3Test

* Fixed TestBookieRequestProcessor

* Fixed line length
  • Loading branch information
merlimat authored Mar 5, 2023
1 parent b4112df commit dfde3d6
Show file tree
Hide file tree
Showing 22 changed files with 223 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
*/
package org.apache.bookkeeper.processor;

import io.netty.channel.Channel;
import org.apache.bookkeeper.proto.BookieRequestHandler;

/**
* A request processor that is used for processing requests at bookie side.
Expand All @@ -41,5 +41,5 @@ public interface RequestProcessor extends AutoCloseable {
* @param channel
* channel received the given request <i>r</i>
*/
void processRequest(Object r, Channel channel);
void processRequest(Object r, BookieRequestHandler channel);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,27 @@
/**
* Serverside handler for bookkeeper requests.
*/
class BookieRequestHandler extends ChannelInboundHandlerAdapter {
public class BookieRequestHandler extends ChannelInboundHandlerAdapter {

private static final Logger LOG = LoggerFactory.getLogger(BookieRequestHandler.class);
private final RequestProcessor requestProcessor;
private final ChannelGroup allChannels;

private ChannelHandlerContext ctx;

BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) {
this.requestProcessor = processor;
this.allChannels = allChannels;
}

public ChannelHandlerContext ctx() {
return ctx;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LOG.info("Channel connected {}", ctx.channel());
this.ctx = ctx;
super.channelActive(ctx);
}

Expand Down Expand Up @@ -75,6 +82,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
ctx.fireChannelRead(msg);
return;
}
requestProcessor.processRequest(msg, ctx.channel());
requestProcessor.processRequest(msg, this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ private void shutdownExecutor(OrderedExecutor service) {
}

@Override
public void processRequest(Object msg, Channel c) {
public void processRequest(Object msg, BookieRequestHandler requestHandler) {
Channel channel = requestHandler.ctx().channel();
// If we can decode this packet as a Request protobuf packet, process
// it as a version 3 packet. Else, just use the old protocol.
if (msg instanceof BookkeeperProtocol.Request) {
Expand All @@ -309,16 +310,16 @@ public void processRequest(Object msg, Channel c) {
BookkeeperProtocol.BKPacketHeader header = r.getHeader();
switch (header.getOperation()) {
case ADD_ENTRY:
processAddRequestV3(r, c);
processAddRequestV3(r, requestHandler);
break;
case READ_ENTRY:
processReadRequestV3(r, c);
processReadRequestV3(r, requestHandler);
break;
case FORCE_LEDGER:
processForceLedgerRequestV3(r, c);
processForceLedgerRequestV3(r, requestHandler);
break;
case AUTH:
LOG.info("Ignoring auth operation from client {}", c.remoteAddress());
LOG.info("Ignoring auth operation from client {}", channel.remoteAddress());
BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage
.newBuilder()
.setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
Expand All @@ -328,29 +329,29 @@ public void processRequest(Object msg, Channel c) {
.newBuilder().setHeader(r.getHeader())
.setStatus(BookkeeperProtocol.StatusCode.EOK)
.setAuthResponse(message);
c.writeAndFlush(authResponse.build());
channel.writeAndFlush(authResponse.build());
break;
case WRITE_LAC:
processWriteLacRequestV3(r, c);
processWriteLacRequestV3(r, requestHandler);
break;
case READ_LAC:
processReadLacRequestV3(r, c);
processReadLacRequestV3(r, requestHandler);
break;
case GET_BOOKIE_INFO:
processGetBookieInfoRequestV3(r, c);
processGetBookieInfoRequestV3(r, requestHandler);
break;
case START_TLS:
processStartTLSRequestV3(r, c);
processStartTLSRequestV3(r, requestHandler);
break;
case GET_LIST_OF_ENTRIES_OF_LEDGER:
processGetListOfEntriesOfLedgerProcessorV3(r, c);
processGetListOfEntriesOfLedgerProcessorV3(r, requestHandler);
break;
default:
LOG.info("Unknown operation type {}", header.getOperation());
BookkeeperProtocol.Response.Builder response =
BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
c.writeAndFlush(response.build());
channel.writeAndFlush(response.build());
if (statsEnabled) {
bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
}
Expand All @@ -365,26 +366,27 @@ public void processRequest(Object msg, Channel c) {
switch (r.getOpCode()) {
case BookieProtocol.ADDENTRY:
checkArgument(r instanceof BookieProtocol.ParsedAddRequest);
processAddRequest((BookieProtocol.ParsedAddRequest) r, c);
processAddRequest((BookieProtocol.ParsedAddRequest) r, requestHandler);
break;
case BookieProtocol.READENTRY:
checkArgument(r instanceof BookieProtocol.ReadRequest);
processReadRequest((BookieProtocol.ReadRequest) r, c);
processReadRequest((BookieProtocol.ReadRequest) r, requestHandler);
break;
case BookieProtocol.AUTH:
LOG.info("Ignoring auth operation from client {}", c.remoteAddress());
LOG.info("Ignoring auth operation from client {}",
requestHandler.ctx().channel().remoteAddress());
BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage
.newBuilder()
.setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
.setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
.build();

c.writeAndFlush(new BookieProtocol.AuthResponse(
channel.writeAndFlush(new BookieProtocol.AuthResponse(
BookieProtocol.CURRENT_PROTOCOL_VERSION, message));
break;
default:
LOG.error("Unknown op type {}, sending error", r.getOpCode());
c.writeAndFlush(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
channel.writeAndFlush(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
if (statsEnabled) {
bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
}
Expand All @@ -402,26 +404,28 @@ private void restoreMdcContextFromRequest(BookkeeperProtocol.Request req) {
}
}

private void processWriteLacRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
WriteLacProcessorV3 writeLac = new WriteLacProcessorV3(r, c, this);
private void processWriteLacRequestV3(final BookkeeperProtocol.Request r,
final BookieRequestHandler requestHandler) {
WriteLacProcessorV3 writeLac = new WriteLacProcessorV3(r, requestHandler, this);
if (null == writeThreadPool) {
writeLac.run();
} else {
writeThreadPool.executeOrdered(r.getAddRequest().getLedgerId(), writeLac);
}
}

private void processReadLacRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
ReadLacProcessorV3 readLac = new ReadLacProcessorV3(r, c, this);
private void processReadLacRequestV3(final BookkeeperProtocol.Request r,
final BookieRequestHandler requestHandler) {
ReadLacProcessorV3 readLac = new ReadLacProcessorV3(r, requestHandler, this);
if (null == readThreadPool) {
readLac.run();
} else {
readThreadPool.executeOrdered(r.getAddRequest().getLedgerId(), readLac);
}
}

private void processAddRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, this);
private void processAddRequestV3(final BookkeeperProtocol.Request r, final BookieRequestHandler requestHandler) {
WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, requestHandler, this);

final OrderedExecutor threadPool;
if (RequestUtils.isHighPriority(r)) {
Expand Down Expand Up @@ -455,8 +459,9 @@ private void processAddRequestV3(final BookkeeperProtocol.Request r, final Chann
}
}

private void processForceLedgerRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
ForceLedgerProcessorV3 forceLedger = new ForceLedgerProcessorV3(r, c, this);
private void processForceLedgerRequestV3(final BookkeeperProtocol.Request r,
final BookieRequestHandler requestHandler) {
ForceLedgerProcessorV3 forceLedger = new ForceLedgerProcessorV3(r, requestHandler, this);

final OrderedExecutor threadPool;
if (RequestUtils.isHighPriority(r)) {
Expand Down Expand Up @@ -492,19 +497,20 @@ private void processForceLedgerRequestV3(final BookkeeperProtocol.Request r, fin
}
}

private void processReadRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
ExecutorService fenceThread = null == highPriorityThreadPool ? null : highPriorityThreadPool.chooseThread(c);
private void processReadRequestV3(final BookkeeperProtocol.Request r, final BookieRequestHandler requestHandler) {
ExecutorService fenceThread = null == highPriorityThreadPool ? null :
highPriorityThreadPool.chooseThread(requestHandler.ctx());

final ReadEntryProcessorV3 read;
final OrderedExecutor threadPool;
if (RequestUtils.isLongPollReadRequest(r.getReadRequest())) {
ExecutorService lpThread = longPollThreadPool.chooseThread(c);
ExecutorService lpThread = longPollThreadPool.chooseThread(requestHandler.ctx());

read = new LongPollReadEntryProcessorV3(r, c, this, fenceThread,
read = new LongPollReadEntryProcessorV3(r, requestHandler, this, fenceThread,
lpThread, requestTimer);
threadPool = longPollThreadPool;
} else {
read = new ReadEntryProcessorV3(r, c, this, fenceThread);
read = new ReadEntryProcessorV3(r, requestHandler, this, fenceThread);

// If it's a high priority read (fencing or as part of recovery process), we want to make sure it
// gets executed as fast as possible, so bypass the normal readThreadPool
Expand Down Expand Up @@ -544,13 +550,16 @@ private void processReadRequestV3(final BookkeeperProtocol.Request r, final Chan
}
}

private void processStartTLSRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
private void processStartTLSRequestV3(final BookkeeperProtocol.Request r,
final BookieRequestHandler requestHandler) {
BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder();
BookkeeperProtocol.BKPacketHeader.Builder header = BookkeeperProtocol.BKPacketHeader.newBuilder();
header.setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE);
header.setOperation(r.getHeader().getOperation());
header.setTxnId(r.getHeader().getTxnId());
response.setHeader(header.build());
final Channel c = requestHandler.ctx().channel();

if (shFactory == null) {
LOG.error("Got StartTLS request but TLS not configured");
response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
Expand Down Expand Up @@ -596,27 +605,29 @@ public void operationComplete(Future<Channel> future) throws Exception {
}
}

private void processGetBookieInfoRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
GetBookieInfoProcessorV3 getBookieInfo = new GetBookieInfoProcessorV3(r, c, this);
private void processGetBookieInfoRequestV3(final BookkeeperProtocol.Request r,
final BookieRequestHandler requestHandler) {
GetBookieInfoProcessorV3 getBookieInfo = new GetBookieInfoProcessorV3(r, requestHandler, this);
if (null == readThreadPool) {
getBookieInfo.run();
} else {
readThreadPool.submit(getBookieInfo);
}
}

private void processGetListOfEntriesOfLedgerProcessorV3(final BookkeeperProtocol.Request r, final Channel c) {
GetListOfEntriesOfLedgerProcessorV3 getListOfEntriesOfLedger = new GetListOfEntriesOfLedgerProcessorV3(r, c,
this);
private void processGetListOfEntriesOfLedgerProcessorV3(final BookkeeperProtocol.Request r,
final BookieRequestHandler requestHandler) {
GetListOfEntriesOfLedgerProcessorV3 getListOfEntriesOfLedger =
new GetListOfEntriesOfLedgerProcessorV3(r, requestHandler, this);
if (null == readThreadPool) {
getListOfEntriesOfLedger.run();
} else {
readThreadPool.submit(getListOfEntriesOfLedger);
}
}

private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final Channel c) {
WriteEntryProcessor write = WriteEntryProcessor.create(r, c, this);
private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final BookieRequestHandler requestHandler) {
WriteEntryProcessor write = WriteEntryProcessor.create(r, requestHandler, this);

// If it's a high priority add (usually as part of recovery process), we want to make sure it gets
// executed as fast as possible, so bypass the normal writeThreadPool and execute in highPriorityThreadPool
Expand Down Expand Up @@ -647,10 +658,11 @@ private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final Ch
}
}

private void processReadRequest(final BookieProtocol.ReadRequest r, final Channel c) {
private void processReadRequest(final BookieProtocol.ReadRequest r, final BookieRequestHandler requestHandler) {
ExecutorService fenceThreadPool =
null == highPriorityThreadPool ? null : highPriorityThreadPool.chooseThread(c);
ReadEntryProcessor read = ReadEntryProcessor.create(r, c, this, fenceThreadPool, throttleReadResponses);
null == highPriorityThreadPool ? null : highPriorityThreadPool.chooseThread(requestHandler.ctx());
ReadEntryProcessor read = ReadEntryProcessor.create(r, requestHandler,
this, fenceThreadPool, throttleReadResponses);

// If it's a high priority read (fencing or as part of recovery process), we want to make sure it
// gets executed as fast as possible, so bypass the normal readThreadPool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import static com.google.common.base.Preconditions.checkArgument;

import io.netty.channel.Channel;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.net.BookieId;
Expand All @@ -39,9 +38,9 @@
class ForceLedgerProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ForceLedgerProcessorV3.class);

public ForceLedgerProcessorV3(Request request, Channel channel,
public ForceLedgerProcessorV3(Request request, BookieRequestHandler requestHandler,
BookieRequestProcessor requestProcessor) {
super(request, channel, requestProcessor);
super(request, requestHandler, requestProcessor);
}

// Returns null if there is no exception thrown
Expand Down Expand Up @@ -98,7 +97,7 @@ private ForceLedgerResponse getForceLedgerResponse() {
};
StatusCode status = null;
try {
requestProcessor.getBookie().forceLedger(ledgerId, wcb, channel);
requestProcessor.getBookie().forceLedger(ledgerId, wcb, requestHandler);
status = StatusCode.EOK;
} catch (Throwable t) {
logger.error("Unexpected exception while forcing ledger {} : ", ledgerId, t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
*/
package org.apache.bookkeeper.proto;

import io.netty.channel.Channel;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest;
Expand All @@ -38,9 +37,9 @@
public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(GetBookieInfoProcessorV3.class);

public GetBookieInfoProcessorV3(Request request, Channel channel,
public GetBookieInfoProcessorV3(Request request, BookieRequestHandler requestHandler,
BookieRequestProcessor requestProcessor) {
super(request, channel, requestProcessor);
super(request, requestHandler, requestProcessor);
}

private GetBookieInfoResponse getGetBookieInfoResponse() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package org.apache.bookkeeper.proto;

import com.google.protobuf.ByteString;
import io.netty.channel.Channel;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Bookie;
Expand All @@ -44,9 +43,9 @@ public class GetListOfEntriesOfLedgerProcessorV3 extends PacketProcessorBaseV3 i
protected final GetListOfEntriesOfLedgerRequest getListOfEntriesOfLedgerRequest;
protected final long ledgerId;

public GetListOfEntriesOfLedgerProcessorV3(Request request, Channel channel,
public GetListOfEntriesOfLedgerProcessorV3(Request request, BookieRequestHandler requestHandler,
BookieRequestProcessor requestProcessor) {
super(request, channel, requestProcessor);
super(request, requestHandler, requestProcessor);
this.getListOfEntriesOfLedgerRequest = request.getGetListOfEntriesOfLedgerRequest();
this.ledgerId = getListOfEntriesOfLedgerRequest.getLedgerId();
}
Expand Down
Loading

0 comments on commit dfde3d6

Please sign in to comment.