Skip to content

Commit

Permalink
(feat) add FixedThreadsExecutorGroup #168
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed May 22, 2019
1 parent b76e3e6 commit e79dda1
Show file tree
Hide file tree
Showing 46 changed files with 831 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,25 @@ public boolean disconnect(final Endpoint endpoint) {
@Override
public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
final RpcResponseClosure<T> done, final int timeoutMs) {
return invokeWithDone(endpoint, request, this.defaultInvokeCtx, done, timeoutMs);
return invokeWithDone(endpoint, request, this.defaultInvokeCtx, done, timeoutMs, this.rpcExecutor);
}

public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
final RpcResponseClosure<T> done, final int timeoutMs,
final Executor rpcExecutor) {
return invokeWithDone(endpoint, request, this.defaultInvokeCtx, done, timeoutMs, rpcExecutor);
}

public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
final InvokeContext ctx,
final RpcResponseClosure<T> done, final int timeoutMs) {
return invokeWithDone(endpoint, request, ctx, done, timeoutMs, this.rpcExecutor);
}

public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
final InvokeContext ctx,
final RpcResponseClosure<T> done, final int timeoutMs,
final Executor rpcExecutor) {
final FutureImpl<Message> future = new FutureImpl<>();
try {
final Url rpcUrl = this.rpcAddressParser.parse(endpoint.toString());
Expand Down Expand Up @@ -221,7 +234,7 @@ public void onException(final Throwable e) {

@Override
public Executor getExecutor() {
return AbstractBoltClientService.this.rpcExecutor;
return rpcExecutor != null ? rpcExecutor : AbstractBoltClientService.this.rpcExecutor;
}
}, timeoutMs <= 0 ? this.rpcOptions.getRpcDefaultTimeout() : timeoutMs);
} catch (final InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,33 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.concurrent.RejectedExecutionHandlers;

import org.apache.commons.lang.StringUtils;

import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
import com.alipay.remoting.Connection;
import com.alipay.remoting.ConnectionEventProcessor;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.NodeManager;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.rpc.RaftServerService;
import com.alipay.sofa.jraft.rpc.RpcRequestClosure;
import com.alipay.sofa.jraft.rpc.RpcRequests.AppendEntriesRequest;
import com.alipay.sofa.jraft.rpc.RpcRequests.AppendEntriesRequestHeader;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.Utils;
import com.alipay.sofa.jraft.util.concurrent.DefaultFixedThreadsExecutorGroupFactory;
import com.alipay.sofa.jraft.util.concurrent.FixedThreadsExecutorGroup;
import com.alipay.sofa.jraft.util.concurrent.SingleThreadExecutor;
import com.google.protobuf.Message;

import io.netty.util.concurrent.DefaultEventExecutor;

/**
* Append entries request processor.
*
Expand All @@ -51,21 +58,24 @@
public class AppendEntriesRequestProcessor extends NodeRequestProcessor<AppendEntriesRequest> implements
ConnectionEventProcessor {

static final String PEER_ATTR = "jraft-peer";
static final String PEER_ATTR = "jraft-peer";

private static final FixedThreadsExecutorGroup APPEND_ENTRIES_EXECUTORS = createGlobalAppendEntriesExecutors();

/**
* Peer executor selector.
* @author dennis
*
*/
final class PeerExecutorSelector implements ExecutorSelector {

private final ConcurrentMap<Node, Executor> executorMap = new ConcurrentHashMap<>();

PeerExecutorSelector() {
super();
}

@Override
public Executor select(String requestClass, Object requestHeader) {
public Executor select(final String requestClass, final Object requestHeader) {
final AppendEntriesRequestHeader header = (AppendEntriesRequestHeader) requestHeader;
final String groupId = header.getGroupId();
final String peerId = header.getPeerId();
Expand All @@ -78,9 +88,13 @@ public Executor select(String requestClass, Object requestHeader) {

final Node node = NodeManager.getInstance().get(groupId, peer);

if (node == null || !node.getRaftOptions().isReplicatorPipeline()) {
if (node == null) {
return getExecutor();
}
if (!node.getRaftOptions().isReplicatorPipeline()) {
return this.executorMap.computeIfAbsent(node, s -> APPEND_ENTRIES_EXECUTORS.next());
}

// The node enable pipeline, we should ensure bolt support it.
Utils.ensureBoltPipeline();

Expand Down Expand Up @@ -109,25 +123,23 @@ public SequenceRpcRequestClosure(RpcRequestClosure parent, int sequence, String
}

@Override
public void sendResponse(Message msg) {
sendSequenceResponse(groupId, peerId, this.reqSequence, getAsyncContext(), getBizContext(), msg);
public void sendResponse(final Message msg) {
sendSequenceResponse(this.groupId, this.peerId, this.reqSequence, getAsyncContext(), getBizContext(), msg);
}

}

/**
* Response message wrapper with a request sequence number and asyncContex.done
* Response message wrapper with a request sequence number and asyncContext.done
* @author dennis
*
*/
static class SequanceMessage implements Comparable<SequanceMessage> {
static class SequenceMessage implements Comparable<SequenceMessage> {
public final Message msg;
private final int sequence;
private final AsyncContext asyncContex;
private final AsyncContext asyncContext;

public SequanceMessage(AsyncContext asyncContex, Message msg, int sequence) {
public SequenceMessage(AsyncContext asyncContext, Message msg, int sequence) {
super();
this.asyncContex = asyncContex;
this.asyncContext = asyncContext;
this.msg = msg;
this.sequence = sequence;
}
Expand All @@ -136,38 +148,37 @@ public SequanceMessage(AsyncContext asyncContex, Message msg, int sequence) {
* Send the response.
*/
void sendResponse() {
this.asyncContex.sendResponse(this.msg);
this.asyncContext.sendResponse(this.msg);
}

/**
* Order by sequence number
*/
@Override
public int compareTo(SequanceMessage o) {
public int compareTo(final SequenceMessage o) {
return Integer.compare(this.sequence, o.sequence);
}

}

/**
* Send request in pipeline mode.
*/
void sendSequenceResponse(String groupId, String peerId, int seq, AsyncContext asyncContext, BizContext bizContext,
Message msg) {
void sendSequenceResponse(final String groupId, final String peerId, final int seq,
final AsyncContext asyncContext, final BizContext bizContext, final Message msg) {
final Connection connection = bizContext.getConnection();
final PeerRequestContext ctx = getPeerRequestContext(groupId, peerId, connection);
final PriorityQueue<SequanceMessage> respQueue = ctx.responseQueue;
final PriorityQueue<SequenceMessage> respQueue = ctx.responseQueue;
assert (respQueue != null);

synchronized (Utils.withLockObject(respQueue)) {
respQueue.add(new SequanceMessage(asyncContext, msg, seq));
respQueue.add(new SequenceMessage(asyncContext, msg, seq));

if (!ctx.hasTooManyPendingResponses()) {
while (!respQueue.isEmpty()) {
final SequanceMessage queuedPipelinedResponse = respQueue.peek();
final SequenceMessage queuedPipelinedResponse = respQueue.peek();

if (queuedPipelinedResponse.sequence != getNextRequiredSequence(groupId, peerId, connection)) {
//sequence mismatch, waiting for next response.
// sequence mismatch, waiting for next response.
break;
}
respQueue.remove();
Expand All @@ -181,34 +192,33 @@ void sendSequenceResponse(String groupId, String peerId, int seq, AsyncContext a
LOG.warn("Closed connection to peer {}/{}, because of too many pending responses, queued={}, max={}",
ctx.groupId, peerId, respQueue.size(), ctx.maxPendingResponses);
connection.close();
//Close the connection if there are too many pending responses in queue.
// Close the connection if there are too many pending responses in queue.
removePeerRequestContext(groupId, peerId);
}
}
}

static class PeerRequestContext {
class PeerRequestContext {

private final String groupId;
private final String peerId;

// Executor to run the requests
private DefaultEventExecutor executor;
private SingleThreadExecutor executor;
// The request sequence;
private int sequence;
//The required sequence to be sent.
// The required sequence to be sent.
private int nextRequiredSequence;
//The response queue,it's not thread-safe and protected by it self object monitor.
private final PriorityQueue<SequanceMessage> responseQueue;
// The response queue,it's not thread-safe and protected by it self object monitor.
private final PriorityQueue<SequenceMessage> responseQueue;

private final int maxPendingResponses;

public PeerRequestContext(final String groupId, final String peerId, int maxPendingResponses) {
public PeerRequestContext(final String groupId, final String peerId, final int maxPendingResponses) {
super();
this.peerId = peerId;
this.groupId = groupId;
this.executor = new DefaultEventExecutor(JRaftUtils.createThreadFactory(groupId + "/" + peerId
+ "-AppendEntriesThread"));
this.executor = APPEND_ENTRIES_EXECUTORS.next();

this.sequence = 0;
this.nextRequiredSequence = 0;
Expand All @@ -232,7 +242,6 @@ int getAndIncrementSequence() {
synchronized void destroy() {
if (this.executor != null) {
LOG.info("Destroyed peer request context for {}/{}", this.groupId, this.peerId);
this.executor.shutdownGracefully();
this.executor = null;
}
}
Expand All @@ -251,11 +260,11 @@ int getAndIncrementNextRequiredSequence() {
}
}

PeerRequestContext getPeerRequestContext(final String groupId, final String peerId, Connection conn) {
ConcurrentMap<String /* peerId */, PeerRequestContext> groupContexts = peerRequestContexts.get(groupId);
PeerRequestContext getPeerRequestContext(final String groupId, final String peerId, final Connection conn) {
ConcurrentMap<String/* peerId */, PeerRequestContext> groupContexts = this.peerRequestContexts.get(groupId);
if (groupContexts == null) {
groupContexts = new ConcurrentHashMap<>();
final ConcurrentMap<String, PeerRequestContext> existsCtxs = peerRequestContexts.putIfAbsent(groupId,
final ConcurrentMap<String, PeerRequestContext> existsCtxs = this.peerRequestContexts.putIfAbsent(groupId,
groupContexts);
if (existsCtxs != null) {
groupContexts = existsCtxs;
Expand All @@ -280,15 +289,16 @@ PeerRequestContext getPeerRequestContext(final String groupId, final String peer
}
}
}
//Set peer attribute into connection if absent
// Set peer attribute into connection if absent
if (conn != null && conn.getAttribute(PEER_ATTR) == null) {
conn.setAttribute(PEER_ATTR, peerId);
}
return peerCtx;
}

void removePeerRequestContext(final String groupId, final String peerId) {
final ConcurrentMap<String /* peerId */, PeerRequestContext> groupContexts = peerRequestContexts.get(groupId);
final ConcurrentMap<String/* peerId */, PeerRequestContext> groupContexts = this.peerRequestContexts
.get(groupId);
if (groupContexts == null) {
return;
}
Expand All @@ -302,43 +312,45 @@ void removePeerRequestContext(final String groupId, final String peerId) {

/**
* RAFT group peer request contexts
* Map<groupId, <peerId, ctx>>
*/
private final ConcurrentMap<String /* groupId */, ConcurrentMap<String /* peerId */, PeerRequestContext>> peerRequestContexts = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ConcurrentMap<String, PeerRequestContext>> peerRequestContexts = new ConcurrentHashMap<>();

/**
* The executor selector to select executor for processing request.
*/
private final ExecutorSelector executorSelector;
private final ExecutorSelector executorSelector;

public AppendEntriesRequestProcessor(Executor executor) {
super(executor);
this.executorSelector = new PeerExecutorSelector();
}

@Override
protected String getPeerId(AppendEntriesRequest request) {
protected String getPeerId(final AppendEntriesRequest request) {
return request.getPeerId();
}

@Override
protected String getGroupId(AppendEntriesRequest request) {
protected String getGroupId(final AppendEntriesRequest request) {
return request.getGroupId();
}

private int getAndIncrementSequence(String groupId, String peerId, Connection conn) {
private int getAndIncrementSequence(final String groupId, final String peerId, final Connection conn) {
return getPeerRequestContext(groupId, peerId, conn).getAndIncrementSequence();
}

private int getNextRequiredSequence(String groupId, String peerId, Connection conn) {
private int getNextRequiredSequence(final String groupId, final String peerId, final Connection conn) {
return getPeerRequestContext(groupId, peerId, conn).getNextRequiredSequence();
}

private int getAndIncrementNextRequiredSequence(String groupId, String peerId, Connection conn) {
private int getAndIncrementNextRequiredSequence(final String groupId, final String peerId, final Connection conn) {
return getPeerRequestContext(groupId, peerId, conn).getAndIncrementNextRequiredSequence();
}

@Override
public Message processRequest0(RaftServerService service, AppendEntriesRequest request, RpcRequestClosure done) {
public Message processRequest0(final RaftServerService service, final AppendEntriesRequest request,
final RpcRequestClosure done) {

final Node node = (Node) service;

Expand Down Expand Up @@ -369,17 +381,17 @@ public ExecutorSelector getExecutorSelector() {
return this.executorSelector;
}

//TODO called when shutdown service.
// TODO called when shutdown service.
public void destroy() {
for (final ConcurrentMap<String /* peerId */, PeerRequestContext> map : this.peerRequestContexts.values()) {
for (final ConcurrentMap<String/* peerId */, PeerRequestContext> map : this.peerRequestContexts.values()) {
for (final PeerRequestContext ctx : map.values()) {
ctx.destroy();
}
}
}

@Override
public void onEvent(String remoteAddr, Connection conn) {
public void onEvent(final String remoteAddr, final Connection conn) {
final PeerId peer = new PeerId();
final String peerAttr = (String) conn.getAttribute(PEER_ATTR);

Expand All @@ -399,4 +411,17 @@ public void onEvent(String remoteAddr, Connection conn) {
LOG.info("Connection disconnected: {}", remoteAddr);
}
}

private static FixedThreadsExecutorGroup createGlobalAppendEntriesExecutors() {
// TODO do we need still use DefaultEventExecutor?
final DefaultEventExecutor[] executors = new DefaultEventExecutor[Utils.APPEND_ENTRIES_THREADS_RECV];
final ThreadFactory threadFactory = new NamedThreadFactory("Append-Entries-Thread-Recv", true);
final RejectedExecutionHandler rejectedHandler = RejectedExecutionHandlers.backoff(3, 100,
TimeUnit.MILLISECONDS);
for (int i = 0; i < executors.length; i++) {
executors[i] = new DefaultEventExecutor(null, threadFactory, Utils.MAX_APPEND_ENTRIES_TASKS_PER_THREAD,
rejectedHandler);
}
return DefaultFixedThreadsExecutorGroupFactory.INSTANCE.newExecutorGroup(executors);
}
}
Loading

0 comments on commit e79dda1

Please sign in to comment.