Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat) add FixedThreadsExecutorGroup #168 #170

Merged
merged 55 commits into from
Jul 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
e79dda1
(feat) add FixedThreadsExecutorGroup #168
fengjiachun May 22, 2019
4bd103d
(feat) rename method
fengjiachun May 23, 2019
c346125
(feat) add MpscSingleThreadExecutor and benchmark #168
fengjiachun May 23, 2019
40d3c88
(fix) forget to warmup producers
fengjiachun May 23, 2019
6e7261e
(fix) fix some bugs and add unit test
fengjiachun May 23, 2019
4841007
(fix) add more unit test
fengjiachun May 23, 2019
aeaee1b
(fix) add more unit test
fengjiachun May 23, 2019
6fc0915
(fix) add more unit test
fengjiachun May 23, 2019
12cb67b
(fix) add some comments
fengjiachun May 23, 2019
ab3f7ab
(fix) unit test
fengjiachun May 24, 2019
7761b87
(fix) add some comments
fengjiachun May 24, 2019
5615060
(fix) refactoring Utils class
fengjiachun May 24, 2019
61f50d5
(fix) refactoring Utils class
fengjiachun May 24, 2019
84d2bc0
(fix) jraft.closure.threadpool.size.max update default value
fengjiachun May 24, 2019
3d552bf
(fix) fix unit test
fengjiachun May 24, 2019
cefc364
(fix) fix unit test
fengjiachun May 24, 2019
eac2674
(feat) refactor ThreadId and replicator (#169)
killme2008 May 22, 2019
022f472
(feat) add pooled buf allocator (#161)
fengjiachun May 22, 2019
f262ff9
Add unit tests for com.alipay.sofa.jraft.util.BytesUtil (#166)
EricHetti May 23, 2019
72dc240
(fix) Utils.java format
fengjiachun May 24, 2019
83d6d51
(feat) add FixedThreadsExecutorGroup #168
fengjiachun May 22, 2019
dbf4eb4
(feat) rename method
fengjiachun May 23, 2019
d9a9396
(feat) add MpscSingleThreadExecutor and benchmark #168
fengjiachun May 23, 2019
729c396
(fix) forget to warmup producers
fengjiachun May 23, 2019
da4cdd1
(fix) fix some bugs and add unit test
fengjiachun May 23, 2019
9c63192
(fix) add more unit test
fengjiachun May 23, 2019
4c27d48
(fix) add more unit test
fengjiachun May 23, 2019
94e1a72
(fix) add more unit test
fengjiachun May 23, 2019
033cf7a
(fix) add some comments
fengjiachun May 23, 2019
01bfd48
(fix) unit test
fengjiachun May 24, 2019
296930f
(fix) add some comments
fengjiachun May 24, 2019
4926806
(fix) refactoring Utils class
fengjiachun May 24, 2019
6c900aa
(fix) refactoring Utils class
fengjiachun May 24, 2019
0116dfd
(fix) jraft.closure.threadpool.size.max update default value
fengjiachun May 24, 2019
9a839aa
(fix) fix unit test
fengjiachun May 24, 2019
6532952
(fix) fix unit test
fengjiachun May 24, 2019
ca71e65
(feat) add pooled buf allocator (#161)
fengjiachun May 22, 2019
1f09381
(fix) Utils.java format
fengjiachun May 24, 2019
84e4763
Merge branch 'master' into feat/event_loop
fengjiachun May 24, 2019
0cf77c5
(fix) Utils.java format
fengjiachun May 24, 2019
27eaa9f
(fix) fix bad key with executor map
fengjiachun May 24, 2019
fac9134
(fix) bad import
fengjiachun May 24, 2019
300f4ce
(fix) fix unit test
fengjiachun May 25, 2019
ea84892
(feat) add mor benchmark
fengjiachun May 25, 2019
e40f6d9
(fix) code format
fengjiachun May 25, 2019
fcdd47a
(fix) code format
fengjiachun May 25, 2019
1a525dd
(fix) benchmark with jmh
fengjiachun May 25, 2019
f63f5c1
(fix) benchmark with jmh
fengjiachun May 25, 2019
4ad9e34
(fix) set common daemon
fengjiachun May 25, 2019
bd2fa35
(fix) fix unit test
fengjiachun May 25, 2019
e18300a
(fix) should be no radical changes, especially if they are not fully …
fengjiachun May 26, 2019
7bb998d
(feat) add jctools
fengjiachun Jun 3, 2019
2da5c83
(feat) configure the number of processors #180 (#181)
fengjiachun Jun 13, 2019
69aaf68
Merge branch 'master' into feat/event_loop
fengjiachun Jun 27, 2019
0d2e1a0
(fix) format
fengjiachun Jun 28, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions jraft-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</dependency>
<!-- jctools -->
<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
</dependency>
<!-- mock -->
<dependency>
<groupId>org.mockito</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static Executor createExecutor(final String prefix, final int number) {
* @since 0.0.3
*/
public static ThreadFactory createThreadFactory(final String prefixName) {
return new NamedThreadFactory(prefixName);
return new NamedThreadFactory(prefixName, true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public class TimerManager implements Lifecycle<Integer> {

@Override
public boolean init(Integer coreSize) {
executor = Executors.newScheduledThreadPool(coreSize, new NamedThreadFactory("JRaft-Node-ScheduleThreadPool-"));
executor = Executors.newScheduledThreadPool(coreSize, new NamedThreadFactory("JRaft-Node-ScheduleThreadPool-",
true));
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected boolean initRpcClient(final int rpcProcessorThreadPoolSize) {
.maximumThreads(rpcProcessorThreadPoolSize) //
.keepAliveSeconds(60L) //
.workQueue(new ArrayBlockingQueue<>(10000)) //
.threadFactory(new NamedThreadFactory("JRaft-RPC-Processor-")) //
.threadFactory(new NamedThreadFactory("JRaft-RPC-Processor-", true)) //
.build();
if (this.rpcOptions.getMetricRegistry() != null) {
this.rpcOptions.getMetricRegistry().register("raft-rpc-client-thread-pool",
Expand Down Expand Up @@ -157,12 +157,25 @@ public boolean disconnect(final Endpoint endpoint) {
@Override
public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
final RpcResponseClosure<T> done, final int timeoutMs) {
return invokeWithDone(endpoint, request, this.defaultInvokeCtx, done, timeoutMs);
return invokeWithDone(endpoint, request, this.defaultInvokeCtx, done, timeoutMs, this.rpcExecutor);
}

public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
final RpcResponseClosure<T> done, final int timeoutMs,
final Executor rpcExecutor) {
return invokeWithDone(endpoint, request, this.defaultInvokeCtx, done, timeoutMs, rpcExecutor);
}

public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
final InvokeContext ctx,
final RpcResponseClosure<T> done, final int timeoutMs) {
return invokeWithDone(endpoint, request, ctx, done, timeoutMs, this.rpcExecutor);
}

public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
final InvokeContext ctx,
final RpcResponseClosure<T> done, final int timeoutMs,
final Executor rpcExecutor) {
final FutureImpl<Message> future = new FutureImpl<>();
try {
final Url rpcUrl = this.rpcAddressParser.parse(endpoint.toString());
Expand Down Expand Up @@ -221,7 +234,7 @@ public void onException(final Throwable e) {

@Override
public Executor getExecutor() {
return AbstractBoltClientService.this.rpcExecutor;
return rpcExecutor != null ? rpcExecutor : AbstractBoltClientService.this.rpcExecutor;
}
}, timeoutMs <= 0 ? this.rpcOptions.getRpcDefaultTimeout() : timeoutMs);
} catch (final InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
import com.alipay.sofa.jraft.rpc.RpcRequests.AppendEntriesRequest;
import com.alipay.sofa.jraft.rpc.RpcRequests.AppendEntriesRequestHeader;
import com.alipay.sofa.jraft.util.Utils;
import com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor;
import com.alipay.sofa.jraft.util.concurrent.SingleThreadExecutor;
import com.google.protobuf.Message;

import io.netty.util.concurrent.DefaultEventExecutor;

/**
* Append entries request processor.
*
Expand All @@ -56,7 +56,6 @@ public class AppendEntriesRequestProcessor extends NodeRequestProcessor<AppendEn
/**
* Peer executor selector.
* @author dennis
*
*/
final class PeerExecutorSelector implements ExecutorSelector {

Expand All @@ -65,7 +64,7 @@ final class PeerExecutorSelector implements ExecutorSelector {
}

@Override
public Executor select(String requestClass, Object requestHeader) {
public Executor select(final String requestClass, final Object requestHeader) {
final AppendEntriesRequestHeader header = (AppendEntriesRequestHeader) requestHeader;
final String groupId = header.getGroupId();
final String peerId = header.getPeerId();
Expand All @@ -81,6 +80,7 @@ public Executor select(String requestClass, Object requestHeader) {
if (node == null || !node.getRaftOptions().isReplicatorPipeline()) {
return getExecutor();
}

// The node enable pipeline, we should ensure bolt support it.
Utils.ensureBoltPipeline();

Expand Down Expand Up @@ -109,25 +109,23 @@ public SequenceRpcRequestClosure(RpcRequestClosure parent, int sequence, String
}

@Override
public void sendResponse(Message msg) {
sendSequenceResponse(groupId, peerId, this.reqSequence, getAsyncContext(), getBizContext(), msg);
public void sendResponse(final Message msg) {
sendSequenceResponse(this.groupId, this.peerId, this.reqSequence, getAsyncContext(), getBizContext(), msg);
}

}

/**
* Response message wrapper with a request sequence number and asyncContex.done
* Response message wrapper with a request sequence number and asyncContext.done
* @author dennis
*
*/
static class SequanceMessage implements Comparable<SequanceMessage> {
static class SequenceMessage implements Comparable<SequenceMessage> {
public final Message msg;
private final int sequence;
private final AsyncContext asyncContex;
private final AsyncContext asyncContext;

public SequanceMessage(AsyncContext asyncContex, Message msg, int sequence) {
public SequenceMessage(AsyncContext asyncContext, Message msg, int sequence) {
super();
this.asyncContex = asyncContex;
this.asyncContext = asyncContext;
this.msg = msg;
this.sequence = sequence;
}
Expand All @@ -136,38 +134,37 @@ public SequanceMessage(AsyncContext asyncContex, Message msg, int sequence) {
* Send the response.
*/
void sendResponse() {
this.asyncContex.sendResponse(this.msg);
this.asyncContext.sendResponse(this.msg);
}

/**
* Order by sequence number
*/
@Override
public int compareTo(SequanceMessage o) {
public int compareTo(final SequenceMessage o) {
return Integer.compare(this.sequence, o.sequence);
}

}

/**
* Send request in pipeline mode.
*/
void sendSequenceResponse(String groupId, String peerId, int seq, AsyncContext asyncContext, BizContext bizContext,
Message msg) {
void sendSequenceResponse(final String groupId, final String peerId, final int seq,
final AsyncContext asyncContext, final BizContext bizContext, final Message msg) {
final Connection connection = bizContext.getConnection();
final PeerRequestContext ctx = getPeerRequestContext(groupId, peerId, connection);
final PriorityQueue<SequanceMessage> respQueue = ctx.responseQueue;
final PriorityQueue<SequenceMessage> respQueue = ctx.responseQueue;
assert (respQueue != null);

synchronized (Utils.withLockObject(respQueue)) {
respQueue.add(new SequanceMessage(asyncContext, msg, seq));
respQueue.add(new SequenceMessage(asyncContext, msg, seq));

if (!ctx.hasTooManyPendingResponses()) {
while (!respQueue.isEmpty()) {
final SequanceMessage queuedPipelinedResponse = respQueue.peek();
final SequenceMessage queuedPipelinedResponse = respQueue.peek();

if (queuedPipelinedResponse.sequence != getNextRequiredSequence(groupId, peerId, connection)) {
//sequence mismatch, waiting for next response.
// sequence mismatch, waiting for next response.
break;
}
respQueue.remove();
Expand All @@ -181,7 +178,7 @@ void sendSequenceResponse(String groupId, String peerId, int seq, AsyncContext a
LOG.warn("Closed connection to peer {}/{}, because of too many pending responses, queued={}, max={}",
ctx.groupId, peerId, respQueue.size(), ctx.maxPendingResponses);
connection.close();
//Close the connection if there are too many pending responses in queue.
// Close the connection if there are too many pending responses in queue.
removePeerRequestContext(groupId, peerId);
}
}
Expand All @@ -193,22 +190,22 @@ static class PeerRequestContext {
private final String peerId;

// Executor to run the requests
private DefaultEventExecutor executor;
private SingleThreadExecutor executor;
// The request sequence;
private int sequence;
//The required sequence to be sent.
// The required sequence to be sent.
private int nextRequiredSequence;
//The response queue,it's not thread-safe and protected by it self object monitor.
private final PriorityQueue<SequanceMessage> responseQueue;
// The response queue,it's not thread-safe and protected by it self object monitor.
private final PriorityQueue<SequenceMessage> responseQueue;

private final int maxPendingResponses;

public PeerRequestContext(final String groupId, final String peerId, int maxPendingResponses) {
public PeerRequestContext(final String groupId, final String peerId, final int maxPendingResponses) {
super();
this.peerId = peerId;
this.groupId = groupId;
this.executor = new DefaultEventExecutor(JRaftUtils.createThreadFactory(groupId + "/" + peerId
+ "-AppendEntriesThread"));
this.executor = new MpscSingleThreadExecutor(Utils.MAX_APPEND_ENTRIES_TASKS_PER_THREAD,
JRaftUtils.createThreadFactory(groupId + "/" + peerId + "-AppendEntriesThread"));

this.sequence = 0;
this.nextRequiredSequence = 0;
Expand Down Expand Up @@ -251,11 +248,11 @@ int getAndIncrementNextRequiredSequence() {
}
}

PeerRequestContext getPeerRequestContext(final String groupId, final String peerId, Connection conn) {
ConcurrentMap<String /* peerId */, PeerRequestContext> groupContexts = peerRequestContexts.get(groupId);
PeerRequestContext getPeerRequestContext(final String groupId, final String peerId, final Connection conn) {
ConcurrentMap<String/* peerId */, PeerRequestContext> groupContexts = this.peerRequestContexts.get(groupId);
if (groupContexts == null) {
groupContexts = new ConcurrentHashMap<>();
final ConcurrentMap<String, PeerRequestContext> existsCtxs = peerRequestContexts.putIfAbsent(groupId,
final ConcurrentMap<String, PeerRequestContext> existsCtxs = this.peerRequestContexts.putIfAbsent(groupId,
groupContexts);
if (existsCtxs != null) {
groupContexts = existsCtxs;
Expand All @@ -280,15 +277,16 @@ PeerRequestContext getPeerRequestContext(final String groupId, final String peer
}
}
}
//Set peer attribute into connection if absent
// Set peer attribute into connection if absent
if (conn != null && conn.getAttribute(PEER_ATTR) == null) {
conn.setAttribute(PEER_ATTR, peerId);
}
return peerCtx;
}

void removePeerRequestContext(final String groupId, final String peerId) {
final ConcurrentMap<String /* peerId */, PeerRequestContext> groupContexts = peerRequestContexts.get(groupId);
final ConcurrentMap<String/* peerId */, PeerRequestContext> groupContexts = this.peerRequestContexts
.get(groupId);
if (groupContexts == null) {
return;
}
Expand All @@ -302,43 +300,45 @@ void removePeerRequestContext(final String groupId, final String peerId) {

/**
* RAFT group peer request contexts
* Map<groupId, <peerId, ctx>>
*/
private final ConcurrentMap<String /* groupId */, ConcurrentMap<String /* peerId */, PeerRequestContext>> peerRequestContexts = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ConcurrentMap<String, PeerRequestContext>> peerRequestContexts = new ConcurrentHashMap<>();

/**
* The executor selector to select executor for processing request.
*/
private final ExecutorSelector executorSelector;
private final ExecutorSelector executorSelector;

public AppendEntriesRequestProcessor(Executor executor) {
super(executor);
this.executorSelector = new PeerExecutorSelector();
}

@Override
protected String getPeerId(AppendEntriesRequest request) {
protected String getPeerId(final AppendEntriesRequest request) {
return request.getPeerId();
}

@Override
protected String getGroupId(AppendEntriesRequest request) {
protected String getGroupId(final AppendEntriesRequest request) {
return request.getGroupId();
}

private int getAndIncrementSequence(String groupId, String peerId, Connection conn) {
private int getAndIncrementSequence(final String groupId, final String peerId, final Connection conn) {
return getPeerRequestContext(groupId, peerId, conn).getAndIncrementSequence();
}

private int getNextRequiredSequence(String groupId, String peerId, Connection conn) {
private int getNextRequiredSequence(final String groupId, final String peerId, final Connection conn) {
return getPeerRequestContext(groupId, peerId, conn).getNextRequiredSequence();
}

private int getAndIncrementNextRequiredSequence(String groupId, String peerId, Connection conn) {
private int getAndIncrementNextRequiredSequence(final String groupId, final String peerId, final Connection conn) {
return getPeerRequestContext(groupId, peerId, conn).getAndIncrementNextRequiredSequence();
}

@Override
public Message processRequest0(RaftServerService service, AppendEntriesRequest request, RpcRequestClosure done) {
public Message processRequest0(final RaftServerService service, final AppendEntriesRequest request,
final RpcRequestClosure done) {

final Node node = (Node) service;

Expand Down Expand Up @@ -369,17 +369,17 @@ public ExecutorSelector getExecutorSelector() {
return this.executorSelector;
}

//TODO called when shutdown service.
// TODO called when shutdown service.
public void destroy() {
for (final ConcurrentMap<String /* peerId */, PeerRequestContext> map : this.peerRequestContexts.values()) {
for (final ConcurrentMap<String/* peerId */, PeerRequestContext> map : this.peerRequestContexts.values()) {
for (final PeerRequestContext ctx : map.values()) {
ctx.destroy();
}
}
}

@Override
public void onEvent(String remoteAddr, Connection conn) {
public void onEvent(final String remoteAddr, final Connection conn) {
final PeerId peer = new PeerId();
final String peerAttr = (String) conn.getAttribute(PEER_ATTR);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package com.alipay.sofa.jraft.rpc.impl.core;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;

import com.alipay.remoting.ConnectionEventType;
Expand All @@ -40,6 +43,9 @@
import com.alipay.sofa.jraft.rpc.RpcResponseClosure;
import com.alipay.sofa.jraft.rpc.impl.AbstractBoltClientService;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.Utils;
import com.alipay.sofa.jraft.util.concurrent.DefaultFixedThreadsExecutorGroupFactory;
import com.alipay.sofa.jraft.util.concurrent.FixedThreadsExecutorGroup;
import com.google.protobuf.Message;

/**
Expand All @@ -51,9 +57,18 @@
*/
public class BoltRaftClientService extends AbstractBoltClientService implements RaftClientService {

private static final FixedThreadsExecutorGroup APPEND_ENTRIES_EXECUTORS = DefaultFixedThreadsExecutorGroupFactory.INSTANCE
.newExecutorGroup(
Utils.APPEND_ENTRIES_THREADS_SEND,
"Append-Entries-Thread-Send",
Utils.MAX_APPEND_ENTRIES_TASKS_PER_THREAD,
true);

private final ConcurrentMap<Endpoint, Executor> appendEntriesExecutorMap = new ConcurrentHashMap<>();

// cached node options
private NodeOptions nodeOptions;
private final ReplicatorGroup rgGroup;
private NodeOptions nodeOptions;
private final ReplicatorGroup rgGroup;

@Override
protected void configRpcClient(final RpcClient rpcClient) {
Expand Down Expand Up @@ -89,7 +104,8 @@ public Future<Message> requestVote(final Endpoint endpoint, final RequestVoteReq
@Override
public Future<Message> appendEntries(final Endpoint endpoint, final AppendEntriesRequest request,
final int timeoutMs, final RpcResponseClosure<AppendEntriesResponse> done) {
return invokeWithDone(endpoint, request, done, timeoutMs);
final Executor executor = this.appendEntriesExecutorMap.computeIfAbsent(endpoint, k -> APPEND_ENTRIES_EXECUTORS.next());
return invokeWithDone(endpoint, request, done, timeoutMs, executor);
}

@Override
Expand Down
Loading