Skip to content

Commit

Permalink
(feat) add MpscSingleThreadExecutor and benchmark #168
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed May 24, 2019
1 parent dbf4eb4 commit d9a9396
Show file tree
Hide file tree
Showing 11 changed files with 530 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.concurrent.RejectedExecutionHandlers;

import org.apache.commons.lang.StringUtils;

Expand All @@ -41,7 +35,6 @@
import com.alipay.sofa.jraft.rpc.RpcRequestClosure;
import com.alipay.sofa.jraft.rpc.RpcRequests.AppendEntriesRequest;
import com.alipay.sofa.jraft.rpc.RpcRequests.AppendEntriesRequestHeader;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.Utils;
import com.alipay.sofa.jraft.util.concurrent.DefaultFixedThreadsExecutorGroupFactory;
import com.alipay.sofa.jraft.util.concurrent.FixedThreadsExecutorGroup;
Expand All @@ -60,7 +53,12 @@ public class AppendEntriesRequestProcessor extends NodeRequestProcessor<AppendEn

static final String PEER_ATTR = "jraft-peer";

private static final FixedThreadsExecutorGroup APPEND_ENTRIES_EXECUTORS = createGlobalAppendEntriesExecutors();
private static final FixedThreadsExecutorGroup APPEND_ENTRIES_EXECUTORS = DefaultFixedThreadsExecutorGroupFactory.INSTANCE
.newExecutorGroup(
Utils.APPEND_ENTRIES_THREADS_RECV,
"Append-Entries-Thread-Recv",
Utils.MAX_APPEND_ENTRIES_TASKS_PER_THREAD,
true);

/**
* Peer executor selector.
Expand Down Expand Up @@ -411,17 +409,4 @@ public void onEvent(final String remoteAddr, final Connection conn) {
LOG.info("Connection disconnected: {}", remoteAddr);
}
}

private static FixedThreadsExecutorGroup createGlobalAppendEntriesExecutors() {
// TODO do we need still use DefaultEventExecutor?
final DefaultEventExecutor[] executors = new DefaultEventExecutor[Utils.APPEND_ENTRIES_THREADS_RECV];
final ThreadFactory threadFactory = new NamedThreadFactory("Append-Entries-Thread-Recv", true);
final RejectedExecutionHandler rejectedHandler = RejectedExecutionHandlers.backoff(3, 100,
TimeUnit.MILLISECONDS);
for (int i = 0; i < executors.length; i++) {
executors[i] = new DefaultEventExecutor(null, threadFactory, Utils.MAX_APPEND_ENTRIES_TASKS_PER_THREAD,
rejectedHandler);
}
return DefaultFixedThreadsExecutorGroupFactory.INSTANCE.newExecutorGroup(executors);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public class BoltRaftClientService extends AbstractBoltClientService implements
.newExecutorGroup(
Utils.APPEND_ENTRIES_THREADS_SEND,
"Append-Entries-Thread-Send",
Utils.MAX_APPEND_ENTRIES_TASKS_PER_THREAD);
Utils.MAX_APPEND_ENTRIES_TASKS_PER_THREAD,
true);

private final ConcurrentMap<Endpoint, Executor> appendEntriesExecutorMap = new ConcurrentHashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ public class Utils {
"jraft.max.append.entries.tasks.per.thread",
String.valueOf(32768)));

/**
* Whether use {@link com.alipay.sofa.jraft.util.concurrent.MpscSingleThreadExecutor}, true by default.
*/
public static final boolean USE_MPSC_SINGLE_THREAD_EXECUTOR = Boolean.parseBoolean(System.getProperty(
"jraft.use.mpsc.single.thread.executor",
"true"));

/**
* Global thread pool to run closure.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public SingleThreadExecutor next() {
}

@Override
public void execute(final int index, final Runnable command) {
this.chooser.select(index).execute(command);
public void execute(final int index, final Runnable task) {
this.chooser.select(index).execute(task);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package com.alipay.sofa.jraft.util.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;

import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.Utils;

/**
*
Expand All @@ -31,10 +34,22 @@ public final class DefaultFixedThreadsExecutorGroupFactory implements FixedThrea
@Override
public FixedThreadsExecutorGroup newExecutorGroup(final int nThreads, final String poolName,
final int maxPendingTasksPerThread) {
return newExecutorGroup(nThreads, poolName, maxPendingTasksPerThread, false);
}

@Override
public FixedThreadsExecutorGroup newExecutorGroup(final int nThreads, final String poolName,
final int maxPendingTasksPerThread, final boolean useMpscQueue) {
Requires.requireTrue(nThreads > 0, "nThreads must > 0");
final boolean mpsc = useMpscQueue && Utils.USE_MPSC_SINGLE_THREAD_EXECUTOR;
final SingleThreadExecutor[] children = new SingleThreadExecutor[nThreads];
final ThreadFactory threadFactory = mpsc ? new NamedThreadFactory(poolName) : null;
for (int i = 0; i < nThreads; i++) {
children[i] = new DefaultSingleThreadExecutor(poolName, maxPendingTasksPerThread);
if (mpsc) {
children[i] = new MpscSingleThreadExecutor(maxPendingTasksPerThread, threadFactory);
} else {
children[i] = new DefaultSingleThreadExecutor(poolName, maxPendingTasksPerThread);
}
}
return new DefaultFixedThreadsExecutorGroup(children);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public DefaultSingleThreadExecutor(String poolName, int maxPendingTasks) {
}

@Override
public void execute(final Runnable command) {
this.singleThreadExecutor.execute(command);
public void execute(final Runnable task) {
this.singleThreadExecutor.execute(task);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ public interface FixedThreadsExecutorGroup extends Iterable<SingleThreadExecutor
SingleThreadExecutor next();

/**
* Executes the given command at some time in the future. The command
* Executes the given task at some time in the future. The task
* execute by a specified thread, which is selected by index.
*
* @param index index for thread chooser
* @param command the runnable task
* @param index index for thread chooser
* @param task the runnable task
*/
void execute(final int index, final Runnable command);
void execute(final int index, final Runnable task);

/**
* Shortcut method for {@link #shutdownGracefully(long, TimeUnit)} with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public interface FixedThreadsExecutorGroupFactory {
FixedThreadsExecutorGroup newExecutorGroup(final int nThreads, final String poolName,
final int maxPendingTasksPerThread);

FixedThreadsExecutorGroup newExecutorGroup(final int nThreads, final String poolName,
final int maxPendingTasksPerThread, final boolean useMpscQueue);

FixedThreadsExecutorGroup newExecutorGroup(final SingleThreadExecutor[] children);

FixedThreadsExecutorGroup newExecutorGroup(final SingleThreadExecutor[] children,
Expand Down
Loading

0 comments on commit d9a9396

Please sign in to comment.