From e79dda11edb50037bfcc7c6bdb305109c78db81f Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Wed, 22 May 2019 21:24:50 +0800 Subject: [PATCH] (feat) add FixedThreadsExecutorGroup #168 --- .../rpc/impl/AbstractBoltClientService.java | 17 ++- .../core/AppendEntriesRequestProcessor.java | 127 +++++++++++------- .../rpc/impl/core/BoltRaftClientService.java | 21 ++- .../jraft}/util/ExecutorServiceHelper.java | 2 +- .../com/alipay/sofa/jraft}/util/Ints.java | 6 +- .../com/alipay/sofa/jraft/util/Utils.java | 74 +++++++--- .../DefaultExecutorChooserFactory.java | 81 +++++++++++ .../DefaultFixedThreadsExecutorGroup.java | 115 ++++++++++++++++ ...faultFixedThreadsExecutorGroupFactory.java | 66 +++++++++ .../DefaultSingleThreadExecutor.java | 111 +++++++++++++++ .../concurrent/ExecutorChooserFactory.java | 43 ++++++ .../concurrent/FixedThreadsExecutorGroup.java | 59 ++++++++ .../FixedThreadsExecutorGroupFactory.java | 40 ++++++ .../util/concurrent/SingleThreadExecutor.java | 44 ++++++ .../com/alipay/sofa/jraft/RouteTableTest.java | 14 +- .../com/alipay/sofa/jraft/core/NodeTest.java | 20 +-- .../sofa/jraft/core/ReadOnlyServiceTest.java | 12 +- .../sofa/jraft/entity/LogEntryTest.java | 14 +- .../alipay/sofa/jraft/entity/LogIdTest.java | 4 +- .../alipay/sofa/jraft/entity/PeerIdTest.java | 4 +- .../codec/BaseLogEntryCodecFactoryTest.java | 12 +- .../entity/codec/LogEntryCodecPerfTest.java | 10 +- .../codec/v2/LogEntryV2CodecFactoryTest.java | 10 +- .../rpc/AbstractBoltClientServiceTest.java | 18 +-- .../AppendEntriesRequestProcessorTest.java | 2 +- .../jraft/storage/SnapshotExecutorTest.java | 10 +- .../impl/LocalRaftMetaStorageTest.java | 8 +- .../jraft/storage/impl/LogManagerTest.java | 14 +- .../storage/impl/RocksDBLogStorageTest.java | 12 +- .../local/LocalSnapshotCopierTest.java | 12 +- .../alipay/sofa/jraft/util/CrcUtilTest.java | 4 +- .../alipay/sofa/jraft/util/EndpointTest.java | 4 +- .../rheakv/DistributedLockExample.java | 2 +- .../alipay/sofa/jraft/rhea/StoreEngine.java | 2 +- .../rhea/client/DefaultRheaKVRpcService.java | 2 +- .../pd/DefaultPlacementDriverRpcService.java | 2 +- .../jraft/rhea/client/pd/HeartbeatSender.java | 2 +- .../jraft/rhea/storage/MetricsRawKVStore.java | 2 +- .../jraft/rhea/util/ByteObjectHashMap.java | 2 + .../alipay/sofa/jraft/rhea/util/Lists.java | 1 + .../com/alipay/sofa/jraft/rhea/util/Maps.java | 1 + .../concurrent/disruptor/TaskDispatcher.java | 2 +- .../util/pipeline/DefaultHandlerInvoker.java | 2 +- .../jraft/rhea/chaos/AbstractChaosTest.java | 2 +- .../rhea/AbstractDistributedLockTest.java | 2 +- .../jraft/rhea/PlacementDriverServer.java | 2 +- 46 files changed, 831 insertions(+), 185 deletions(-) rename {jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea => jraft-core/src/main/java/com/alipay/sofa/jraft}/util/ExecutorServiceHelper.java (98%) rename {jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea => jraft-core/src/main/java/com/alipay/sofa/jraft}/util/Ints.java (96%) create mode 100644 jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/DefaultExecutorChooserFactory.java create mode 100644 jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/DefaultFixedThreadsExecutorGroup.java create mode 100644 jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/DefaultFixedThreadsExecutorGroupFactory.java create mode 100644 jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/DefaultSingleThreadExecutor.java create mode 100644 jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/ExecutorChooserFactory.java create mode 100644 jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/FixedThreadsExecutorGroup.java create mode 100644 jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/FixedThreadsExecutorGroupFactory.java create mode 100644 jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/SingleThreadExecutor.java diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractBoltClientService.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractBoltClientService.java index b1ad29f37..cd29c572c 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractBoltClientService.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/AbstractBoltClientService.java @@ -157,12 +157,25 @@ public boolean disconnect(final Endpoint endpoint) { @Override public Future invokeWithDone(final Endpoint endpoint, final Message request, final RpcResponseClosure done, final int timeoutMs) { - return invokeWithDone(endpoint, request, this.defaultInvokeCtx, done, timeoutMs); + return invokeWithDone(endpoint, request, this.defaultInvokeCtx, done, timeoutMs, this.rpcExecutor); + } + + public Future invokeWithDone(final Endpoint endpoint, final Message request, + final RpcResponseClosure done, final int timeoutMs, + final Executor rpcExecutor) { + return invokeWithDone(endpoint, request, this.defaultInvokeCtx, done, timeoutMs, rpcExecutor); } public Future invokeWithDone(final Endpoint endpoint, final Message request, final InvokeContext ctx, final RpcResponseClosure done, final int timeoutMs) { + return invokeWithDone(endpoint, request, ctx, done, timeoutMs, this.rpcExecutor); + } + + public Future invokeWithDone(final Endpoint endpoint, final Message request, + final InvokeContext ctx, + final RpcResponseClosure done, final int timeoutMs, + final Executor rpcExecutor) { final FutureImpl future = new FutureImpl<>(); try { final Url rpcUrl = this.rpcAddressParser.parse(endpoint.toString()); @@ -221,7 +234,7 @@ public void onException(final Throwable e) { @Override public Executor getExecutor() { - return AbstractBoltClientService.this.rpcExecutor; + return rpcExecutor != null ? rpcExecutor : AbstractBoltClientService.this.rpcExecutor; } }, timeoutMs <= 0 ? this.rpcOptions.getRpcDefaultTimeout() : timeoutMs); } catch (final InterruptedException e) { diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java index 3704c5ce8..913f61b27 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java @@ -21,6 +21,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import io.netty.util.concurrent.DefaultEventExecutor; +import io.netty.util.concurrent.RejectedExecutionHandler; +import io.netty.util.concurrent.RejectedExecutionHandlers; import org.apache.commons.lang.StringUtils; @@ -28,7 +34,6 @@ import com.alipay.remoting.BizContext; import com.alipay.remoting.Connection; import com.alipay.remoting.ConnectionEventProcessor; -import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.Node; import com.alipay.sofa.jraft.NodeManager; import com.alipay.sofa.jraft.entity.PeerId; @@ -36,11 +41,13 @@ import com.alipay.sofa.jraft.rpc.RpcRequestClosure; import com.alipay.sofa.jraft.rpc.RpcRequests.AppendEntriesRequest; import com.alipay.sofa.jraft.rpc.RpcRequests.AppendEntriesRequestHeader; +import com.alipay.sofa.jraft.util.NamedThreadFactory; import com.alipay.sofa.jraft.util.Utils; +import com.alipay.sofa.jraft.util.concurrent.DefaultFixedThreadsExecutorGroupFactory; +import com.alipay.sofa.jraft.util.concurrent.FixedThreadsExecutorGroup; +import com.alipay.sofa.jraft.util.concurrent.SingleThreadExecutor; import com.google.protobuf.Message; -import io.netty.util.concurrent.DefaultEventExecutor; - /** * Append entries request processor. * @@ -51,21 +58,24 @@ public class AppendEntriesRequestProcessor extends NodeRequestProcessor implements ConnectionEventProcessor { - static final String PEER_ATTR = "jraft-peer"; + static final String PEER_ATTR = "jraft-peer"; + + private static final FixedThreadsExecutorGroup APPEND_ENTRIES_EXECUTORS = createGlobalAppendEntriesExecutors(); /** * Peer executor selector. * @author dennis - * */ final class PeerExecutorSelector implements ExecutorSelector { + private final ConcurrentMap executorMap = new ConcurrentHashMap<>(); + PeerExecutorSelector() { super(); } @Override - public Executor select(String requestClass, Object requestHeader) { + public Executor select(final String requestClass, final Object requestHeader) { final AppendEntriesRequestHeader header = (AppendEntriesRequestHeader) requestHeader; final String groupId = header.getGroupId(); final String peerId = header.getPeerId(); @@ -78,9 +88,13 @@ public Executor select(String requestClass, Object requestHeader) { final Node node = NodeManager.getInstance().get(groupId, peer); - if (node == null || !node.getRaftOptions().isReplicatorPipeline()) { + if (node == null) { return getExecutor(); } + if (!node.getRaftOptions().isReplicatorPipeline()) { + return this.executorMap.computeIfAbsent(node, s -> APPEND_ENTRIES_EXECUTORS.next()); + } + // The node enable pipeline, we should ensure bolt support it. Utils.ensureBoltPipeline(); @@ -109,25 +123,23 @@ public SequenceRpcRequestClosure(RpcRequestClosure parent, int sequence, String } @Override - public void sendResponse(Message msg) { - sendSequenceResponse(groupId, peerId, this.reqSequence, getAsyncContext(), getBizContext(), msg); + public void sendResponse(final Message msg) { + sendSequenceResponse(this.groupId, this.peerId, this.reqSequence, getAsyncContext(), getBizContext(), msg); } - } /** - * Response message wrapper with a request sequence number and asyncContex.done + * Response message wrapper with a request sequence number and asyncContext.done * @author dennis - * */ - static class SequanceMessage implements Comparable { + static class SequenceMessage implements Comparable { public final Message msg; private final int sequence; - private final AsyncContext asyncContex; + private final AsyncContext asyncContext; - public SequanceMessage(AsyncContext asyncContex, Message msg, int sequence) { + public SequenceMessage(AsyncContext asyncContext, Message msg, int sequence) { super(); - this.asyncContex = asyncContex; + this.asyncContext = asyncContext; this.msg = msg; this.sequence = sequence; } @@ -136,38 +148,37 @@ public SequanceMessage(AsyncContext asyncContex, Message msg, int sequence) { * Send the response. */ void sendResponse() { - this.asyncContex.sendResponse(this.msg); + this.asyncContext.sendResponse(this.msg); } /** * Order by sequence number */ @Override - public int compareTo(SequanceMessage o) { + public int compareTo(final SequenceMessage o) { return Integer.compare(this.sequence, o.sequence); } - } /** * Send request in pipeline mode. */ - void sendSequenceResponse(String groupId, String peerId, int seq, AsyncContext asyncContext, BizContext bizContext, - Message msg) { + void sendSequenceResponse(final String groupId, final String peerId, final int seq, + final AsyncContext asyncContext, final BizContext bizContext, final Message msg) { final Connection connection = bizContext.getConnection(); final PeerRequestContext ctx = getPeerRequestContext(groupId, peerId, connection); - final PriorityQueue respQueue = ctx.responseQueue; + final PriorityQueue respQueue = ctx.responseQueue; assert (respQueue != null); synchronized (Utils.withLockObject(respQueue)) { - respQueue.add(new SequanceMessage(asyncContext, msg, seq)); + respQueue.add(new SequenceMessage(asyncContext, msg, seq)); if (!ctx.hasTooManyPendingResponses()) { while (!respQueue.isEmpty()) { - final SequanceMessage queuedPipelinedResponse = respQueue.peek(); + final SequenceMessage queuedPipelinedResponse = respQueue.peek(); if (queuedPipelinedResponse.sequence != getNextRequiredSequence(groupId, peerId, connection)) { - //sequence mismatch, waiting for next response. + // sequence mismatch, waiting for next response. break; } respQueue.remove(); @@ -181,34 +192,33 @@ void sendSequenceResponse(String groupId, String peerId, int seq, AsyncContext a LOG.warn("Closed connection to peer {}/{}, because of too many pending responses, queued={}, max={}", ctx.groupId, peerId, respQueue.size(), ctx.maxPendingResponses); connection.close(); - //Close the connection if there are too many pending responses in queue. + // Close the connection if there are too many pending responses in queue. removePeerRequestContext(groupId, peerId); } } } - static class PeerRequestContext { + class PeerRequestContext { private final String groupId; private final String peerId; // Executor to run the requests - private DefaultEventExecutor executor; + private SingleThreadExecutor executor; // The request sequence; private int sequence; - //The required sequence to be sent. + // The required sequence to be sent. private int nextRequiredSequence; - //The response queue,it's not thread-safe and protected by it self object monitor. - private final PriorityQueue responseQueue; + // The response queue,it's not thread-safe and protected by it self object monitor. + private final PriorityQueue responseQueue; private final int maxPendingResponses; - public PeerRequestContext(final String groupId, final String peerId, int maxPendingResponses) { + public PeerRequestContext(final String groupId, final String peerId, final int maxPendingResponses) { super(); this.peerId = peerId; this.groupId = groupId; - this.executor = new DefaultEventExecutor(JRaftUtils.createThreadFactory(groupId + "/" + peerId - + "-AppendEntriesThread")); + this.executor = APPEND_ENTRIES_EXECUTORS.next(); this.sequence = 0; this.nextRequiredSequence = 0; @@ -232,7 +242,6 @@ int getAndIncrementSequence() { synchronized void destroy() { if (this.executor != null) { LOG.info("Destroyed peer request context for {}/{}", this.groupId, this.peerId); - this.executor.shutdownGracefully(); this.executor = null; } } @@ -251,11 +260,11 @@ int getAndIncrementNextRequiredSequence() { } } - PeerRequestContext getPeerRequestContext(final String groupId, final String peerId, Connection conn) { - ConcurrentMap groupContexts = peerRequestContexts.get(groupId); + PeerRequestContext getPeerRequestContext(final String groupId, final String peerId, final Connection conn) { + ConcurrentMap groupContexts = this.peerRequestContexts.get(groupId); if (groupContexts == null) { groupContexts = new ConcurrentHashMap<>(); - final ConcurrentMap existsCtxs = peerRequestContexts.putIfAbsent(groupId, + final ConcurrentMap existsCtxs = this.peerRequestContexts.putIfAbsent(groupId, groupContexts); if (existsCtxs != null) { groupContexts = existsCtxs; @@ -280,7 +289,7 @@ PeerRequestContext getPeerRequestContext(final String groupId, final String peer } } } - //Set peer attribute into connection if absent + // Set peer attribute into connection if absent if (conn != null && conn.getAttribute(PEER_ATTR) == null) { conn.setAttribute(PEER_ATTR, peerId); } @@ -288,7 +297,8 @@ PeerRequestContext getPeerRequestContext(final String groupId, final String peer } void removePeerRequestContext(final String groupId, final String peerId) { - final ConcurrentMap groupContexts = peerRequestContexts.get(groupId); + final ConcurrentMap groupContexts = this.peerRequestContexts + .get(groupId); if (groupContexts == null) { return; } @@ -302,13 +312,14 @@ void removePeerRequestContext(final String groupId, final String peerId) { /** * RAFT group peer request contexts + * Map> */ - private final ConcurrentMap> peerRequestContexts = new ConcurrentHashMap<>(); + private final ConcurrentMap> peerRequestContexts = new ConcurrentHashMap<>(); /** * The executor selector to select executor for processing request. */ - private final ExecutorSelector executorSelector; + private final ExecutorSelector executorSelector; public AppendEntriesRequestProcessor(Executor executor) { super(executor); @@ -316,29 +327,30 @@ public AppendEntriesRequestProcessor(Executor executor) { } @Override - protected String getPeerId(AppendEntriesRequest request) { + protected String getPeerId(final AppendEntriesRequest request) { return request.getPeerId(); } @Override - protected String getGroupId(AppendEntriesRequest request) { + protected String getGroupId(final AppendEntriesRequest request) { return request.getGroupId(); } - private int getAndIncrementSequence(String groupId, String peerId, Connection conn) { + private int getAndIncrementSequence(final String groupId, final String peerId, final Connection conn) { return getPeerRequestContext(groupId, peerId, conn).getAndIncrementSequence(); } - private int getNextRequiredSequence(String groupId, String peerId, Connection conn) { + private int getNextRequiredSequence(final String groupId, final String peerId, final Connection conn) { return getPeerRequestContext(groupId, peerId, conn).getNextRequiredSequence(); } - private int getAndIncrementNextRequiredSequence(String groupId, String peerId, Connection conn) { + private int getAndIncrementNextRequiredSequence(final String groupId, final String peerId, final Connection conn) { return getPeerRequestContext(groupId, peerId, conn).getAndIncrementNextRequiredSequence(); } @Override - public Message processRequest0(RaftServerService service, AppendEntriesRequest request, RpcRequestClosure done) { + public Message processRequest0(final RaftServerService service, final AppendEntriesRequest request, + final RpcRequestClosure done) { final Node node = (Node) service; @@ -369,9 +381,9 @@ public ExecutorSelector getExecutorSelector() { return this.executorSelector; } - //TODO called when shutdown service. + // TODO called when shutdown service. public void destroy() { - for (final ConcurrentMap map : this.peerRequestContexts.values()) { + for (final ConcurrentMap map : this.peerRequestContexts.values()) { for (final PeerRequestContext ctx : map.values()) { ctx.destroy(); } @@ -379,7 +391,7 @@ public void destroy() { } @Override - public void onEvent(String remoteAddr, Connection conn) { + public void onEvent(final String remoteAddr, final Connection conn) { final PeerId peer = new PeerId(); final String peerAttr = (String) conn.getAttribute(PEER_ATTR); @@ -399,4 +411,17 @@ public void onEvent(String remoteAddr, Connection conn) { LOG.info("Connection disconnected: {}", remoteAddr); } } + + private static FixedThreadsExecutorGroup createGlobalAppendEntriesExecutors() { + // TODO do we need still use DefaultEventExecutor? + final DefaultEventExecutor[] executors = new DefaultEventExecutor[Utils.APPEND_ENTRIES_THREADS_RECV]; + final ThreadFactory threadFactory = new NamedThreadFactory("Append-Entries-Thread-Recv", true); + final RejectedExecutionHandler rejectedHandler = RejectedExecutionHandlers.backoff(3, 100, + TimeUnit.MILLISECONDS); + for (int i = 0; i < executors.length; i++) { + executors[i] = new DefaultEventExecutor(null, threadFactory, Utils.MAX_APPEND_ENTRIES_TASKS_PER_THREAD, + rejectedHandler); + } + return DefaultFixedThreadsExecutorGroupFactory.INSTANCE.newExecutorGroup(executors); + } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/BoltRaftClientService.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/BoltRaftClientService.java index c6c1dbd89..db167f653 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/BoltRaftClientService.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/BoltRaftClientService.java @@ -16,6 +16,9 @@ */ package com.alipay.sofa.jraft.rpc.impl.core; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; import java.util.concurrent.Future; import com.alipay.remoting.ConnectionEventType; @@ -40,6 +43,9 @@ import com.alipay.sofa.jraft.rpc.RpcResponseClosure; import com.alipay.sofa.jraft.rpc.impl.AbstractBoltClientService; import com.alipay.sofa.jraft.util.Endpoint; +import com.alipay.sofa.jraft.util.Utils; +import com.alipay.sofa.jraft.util.concurrent.DefaultFixedThreadsExecutorGroupFactory; +import com.alipay.sofa.jraft.util.concurrent.FixedThreadsExecutorGroup; import com.google.protobuf.Message; /** @@ -51,9 +57,17 @@ */ public class BoltRaftClientService extends AbstractBoltClientService implements RaftClientService { + private static final FixedThreadsExecutorGroup APPEND_ENTRIES_EXECUTORS = DefaultFixedThreadsExecutorGroupFactory.INSTANCE + .newExecutorGroup( + Utils.APPEND_ENTRIES_THREADS_SEND, + "Append-Entries-Thread-Send", + Utils.MAX_APPEND_ENTRIES_TASKS_PER_THREAD); + + private final ConcurrentMap appendEntriesExecutorMap = new ConcurrentHashMap<>(); + // cached node options - private NodeOptions nodeOptions; - private final ReplicatorGroup rgGroup; + private NodeOptions nodeOptions; + private final ReplicatorGroup rgGroup; @Override protected void configRpcClient(final RpcClient rpcClient) { @@ -89,7 +103,8 @@ public Future requestVote(final Endpoint endpoint, final RequestVoteReq @Override public Future appendEntries(final Endpoint endpoint, final AppendEntriesRequest request, final int timeoutMs, final RpcResponseClosure done) { - return invokeWithDone(endpoint, request, done, timeoutMs); + final Executor executor = this.appendEntriesExecutorMap.computeIfAbsent(endpoint, k -> APPEND_ENTRIES_EXECUTORS.next()); + return invokeWithDone(endpoint, request, done, timeoutMs, executor); } @Override diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/ExecutorServiceHelper.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ExecutorServiceHelper.java similarity index 98% rename from jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/ExecutorServiceHelper.java rename to jraft-core/src/main/java/com/alipay/sofa/jraft/util/ExecutorServiceHelper.java index ec4980011..48edae4a6 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/ExecutorServiceHelper.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ExecutorServiceHelper.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.alipay.sofa.jraft.rhea.util; +package com.alipay.sofa.jraft.util; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/Ints.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Ints.java similarity index 96% rename from jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/Ints.java rename to jraft-core/src/main/java/com/alipay/sofa/jraft/util/Ints.java index 2fc2ab5a7..375e57239 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/Ints.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Ints.java @@ -14,9 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.alipay.sofa.jraft.rhea.util; - -import com.alipay.sofa.jraft.util.Requires; +package com.alipay.sofa.jraft.util; /** * Static utility methods pertaining to {@code int} primitives. @@ -34,7 +32,7 @@ public final class Ints { * Returns the {@code int} value that is equal to {@code value}, if possible. */ public static int checkedCast(final long value) { - int result = (int) value; + final int result = (int) value; Requires.requireTrue(result == value, "out of range: " + value); return result; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Utils.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Utils.java index 9a52c86a6..4b73b303a 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Utils.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Utils.java @@ -48,39 +48,71 @@ */ public class Utils { - private static final Logger LOG = LoggerFactory.getLogger(Utils.class); + private static final Logger LOG = LoggerFactory.getLogger(Utils.class); /** * Default jraft closure executor pool minimum size, CPUs by default. */ - public static final int MIN_CLOSURE_EXECUTOR_POOL_SIZE = Integer.parseInt(System.getProperty( - "jraft.closure.threadpool.size.min", - String.valueOf(cpus()))); + public static final int MIN_CLOSURE_EXECUTOR_POOL_SIZE = Integer.parseInt(System.getProperty( + "jraft.closure.threadpool.size.min", + String.valueOf(cpus()))); /** * Default jraft closure executor pool maximum size, 5*CPUs by default. */ - public static final int MAX_CLOSURE_EXECUTOR_POOL_SIZE = Integer.parseInt(System.getProperty( - "jraft.closure.threadpool.size.max", - String.valueOf(cpus() * 100))); + public static final int MAX_CLOSURE_EXECUTOR_POOL_SIZE = Integer.parseInt(System.getProperty( + "jraft.closure.threadpool.size.max", + String.valueOf(cpus() * 100))); + + /** + * Default jraft append-entries executor(send) pool size, 2*CPUs by default. + */ + public static final int APPEND_ENTRIES_THREADS_SEND = Integer + .parseInt(System + .getProperty( + "jraft.append.entries.threads.send", + String.valueOf(Ints + .findNextPositivePowerOfTwo(cpus() * 2)))); + + /** + * Default jraft append-entries executor(receive) pool size, 2*CPUs by default. + */ + public static final int APPEND_ENTRIES_THREADS_RECV = Integer + .parseInt(System + .getProperty( + "jraft.append.entries.threads.recv", + String.valueOf(Ints + .findNextPositivePowerOfTwo(cpus() * 2)))); + + /** + * Default jraft max pending tasks of append-entries per thread, 65536 by default. + */ + public static final int MAX_APPEND_ENTRIES_TASKS_PER_THREAD = Integer + .parseInt(System + .getProperty( + "jraft.max.append.entries.tasks.per.thread", + String.valueOf(32768))); /** * Global thread pool to run closure. */ - private static ThreadPoolExecutor CLOSURE_EXECUTOR = ThreadPoolUtil - .newBuilder() - .poolName("JRAFT_CLOSURE_EXECUTOR") - .enableMetric(true) - .coreThreads(MIN_CLOSURE_EXECUTOR_POOL_SIZE) - .maximumThreads(MAX_CLOSURE_EXECUTOR_POOL_SIZE) - .keepAliveSeconds(60L) - .workQueue(new SynchronousQueue<>()) - .threadFactory( - new NamedThreadFactory( - "JRaft-Closure-Executor-", true)) - .build(); - - private static final Pattern GROUP_ID_PATTER = Pattern.compile("^[a-zA-Z][a-zA-Z0-9\\-_]*$"); + private static ThreadPoolExecutor CLOSURE_EXECUTOR = ThreadPoolUtil + .newBuilder() + .poolName("JRAFT_CLOSURE_EXECUTOR") + .enableMetric(true) + .coreThreads( + MIN_CLOSURE_EXECUTOR_POOL_SIZE) + .maximumThreads( + MAX_CLOSURE_EXECUTOR_POOL_SIZE) + .keepAliveSeconds(60L) + .workQueue(new SynchronousQueue<>()) + .threadFactory( + new NamedThreadFactory( + "JRaft-Closure-Executor-", true)) + .build(); + + private static final Pattern GROUP_ID_PATTER = Pattern + .compile("^[a-zA-Z][a-zA-Z0-9\\-_]*$"); public static void verifyGroupId(String groupId) { if (StringUtils.isBlank(groupId)) { diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/DefaultExecutorChooserFactory.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/DefaultExecutorChooserFactory.java new file mode 100644 index 000000000..34d1ef5bc --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/DefaultExecutorChooserFactory.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util.concurrent; + +import java.util.concurrent.atomic.AtomicInteger; + +import com.alipay.sofa.jraft.util.Ints; + +/** + * + * @author jiachun.fjc + */ +public final class DefaultExecutorChooserFactory implements ExecutorChooserFactory { + + public static final DefaultExecutorChooserFactory INSTANCE = new DefaultExecutorChooserFactory(); + + @Override + public ExecutorChooser newChooser(final SingleThreadExecutor[] executors) { + if (Ints.isPowerOfTwo(executors.length)) { + return new PowerOfTwoExecutorChooser(executors); + } else { + return new GenericExecutorChooser(executors); + } + } + + private DefaultExecutorChooserFactory() { + } + + private static class PowerOfTwoExecutorChooser extends AbstractExecutorChooser { + + PowerOfTwoExecutorChooser(SingleThreadExecutor[] executors) { + super(executors); + } + + @Override + public SingleThreadExecutor select(final int index) { + return this.executors[index & this.executors.length - 1]; + } + } + + private static class GenericExecutorChooser extends AbstractExecutorChooser { + + protected GenericExecutorChooser(SingleThreadExecutor[] executors) { + super(executors); + } + + @Override + public SingleThreadExecutor select(final int index) { + return this.executors[Math.abs(index % this.executors.length)]; + } + } + + private static abstract class AbstractExecutorChooser implements ExecutorChooser { + + protected final AtomicInteger idx = new AtomicInteger(); + protected final SingleThreadExecutor[] executors; + + protected AbstractExecutorChooser(SingleThreadExecutor[] executors) { + this.executors = executors; + } + + @Override + public SingleThreadExecutor next() { + return select(this.idx.getAndIncrement()); + } + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/DefaultFixedThreadsExecutorGroup.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/DefaultFixedThreadsExecutorGroup.java new file mode 100644 index 000000000..43307aebc --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/DefaultFixedThreadsExecutorGroup.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util.concurrent; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * + * @author jiachun.fjc + */ +public final class DefaultFixedThreadsExecutorGroup implements FixedThreadsExecutorGroup { + + private final SingleThreadExecutor[] children; + private final Set readonlyChildren; + private final ExecutorChooserFactory.ExecutorChooser chooser; + + public DefaultFixedThreadsExecutorGroup(SingleThreadExecutor[] children) { + this(children, DefaultExecutorChooserFactory.INSTANCE.newChooser(children)); + } + + public DefaultFixedThreadsExecutorGroup(SingleThreadExecutor[] children, + ExecutorChooserFactory.ExecutorChooser chooser) { + this.children = children; + this.readonlyChildren = toUnmodifiableSet(this.children); + this.chooser = chooser; + } + + public DefaultFixedThreadsExecutorGroup(ExecutorService[] executors) { + this.children = toSingleThreadExecutors(executors); + this.readonlyChildren = toUnmodifiableSet(this.children); + this.chooser = DefaultExecutorChooserFactory.INSTANCE.newChooser(this.children); + } + + public DefaultFixedThreadsExecutorGroup(ExecutorService[] executors, ExecutorChooserFactory.ExecutorChooser chooser) { + this.children = toSingleThreadExecutors(executors); + this.readonlyChildren = toUnmodifiableSet(this.children); + this.chooser = chooser; + } + + @Override + public SingleThreadExecutor next() { + return this.chooser.next(); + } + + @Override + public void execute(final int index, final Runnable command) { + this.chooser.select(index).execute(command); + } + + @Override + public boolean shutdownGracefully() { + boolean success = true; + for (final SingleThreadExecutor c : this.children) { + success = success && c.shutdownGracefully(); + } + return success; + } + + @Override + public boolean shutdownGracefully(final long timeout, final TimeUnit unit) { + boolean success = true; + final long timeoutNanos = unit.toNanos(timeout); + final long start = System.nanoTime(); + for (final SingleThreadExecutor c : this.children) { + success = success && c.shutdownGracefully(timeout, unit); + if (System.nanoTime() - start > timeoutNanos) { + success = false; + break; + } + } + return success; + } + + @Override + public Iterator iterator() { + return this.readonlyChildren.iterator(); + } + + private static SingleThreadExecutor[] toSingleThreadExecutors(final ExecutorService[] executors) { + final SingleThreadExecutor[] array = new SingleThreadExecutor[executors.length]; + for (int i = 0; i < executors.length; i++) { + if (executors[i] instanceof SingleThreadExecutor) { + array[i] = (SingleThreadExecutor) executors[i]; + } else { + array[i] = new DefaultSingleThreadExecutor(executors[i]); + } + } + return array; + } + + private static Set toUnmodifiableSet(final SingleThreadExecutor[] children) { + final Set tmp = new LinkedHashSet<>(); + Collections.addAll(tmp, children); + return Collections.unmodifiableSet(tmp); + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/DefaultFixedThreadsExecutorGroupFactory.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/DefaultFixedThreadsExecutorGroupFactory.java new file mode 100644 index 000000000..0ee82945e --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/DefaultFixedThreadsExecutorGroupFactory.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util.concurrent; + +import java.util.concurrent.ExecutorService; + +import com.alipay.sofa.jraft.util.Requires; + +/** + * + * @author jiachun.fjc + */ +public final class DefaultFixedThreadsExecutorGroupFactory implements FixedThreadsExecutorGroupFactory { + + public static final DefaultFixedThreadsExecutorGroupFactory INSTANCE = new DefaultFixedThreadsExecutorGroupFactory(); + + @Override + public FixedThreadsExecutorGroup newExecutorGroup(final int nThreads, final String poolName, + final int maxPendingTasksPerThread) { + Requires.requireTrue(nThreads > 0, "nThreads must > 0"); + final SingleThreadExecutor[] children = new SingleThreadExecutor[nThreads]; + for (int i = 0; i < nThreads; i++) { + children[i] = new DefaultSingleThreadExecutor(poolName, maxPendingTasksPerThread); + } + return new DefaultFixedThreadsExecutorGroup(children); + } + + @Override + public FixedThreadsExecutorGroup newExecutorGroup(final SingleThreadExecutor[] children) { + return new DefaultFixedThreadsExecutorGroup(children); + } + + @Override + public FixedThreadsExecutorGroup newExecutorGroup(final SingleThreadExecutor[] children, + final ExecutorChooserFactory.ExecutorChooser chooser) { + return new DefaultFixedThreadsExecutorGroup(children, chooser); + } + + @Override + public FixedThreadsExecutorGroup newExecutorGroup(final ExecutorService[] children) { + return new DefaultFixedThreadsExecutorGroup(children); + } + + @Override + public FixedThreadsExecutorGroup newExecutorGroup(final ExecutorService[] children, + final ExecutorChooserFactory.ExecutorChooser chooser) { + return new DefaultFixedThreadsExecutorGroup(children, chooser); + } + + private DefaultFixedThreadsExecutorGroupFactory() { + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/DefaultSingleThreadExecutor.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/DefaultSingleThreadExecutor.java new file mode 100644 index 000000000..fbe9a14b5 --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/DefaultSingleThreadExecutor.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util.concurrent; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import com.alipay.sofa.jraft.util.ExecutorServiceHelper; +import com.alipay.sofa.jraft.util.NamedThreadFactory; +import com.alipay.sofa.jraft.util.ThreadPoolUtil; + +/** + * + * @author jiachun.fjc + */ +public final class DefaultSingleThreadExecutor implements SingleThreadExecutor { + + private final SingleThreadExecutor singleThreadExecutor; + + public DefaultSingleThreadExecutor(ExecutorService singleThreadExecutorService) { + this.singleThreadExecutor = toSingleThreadExecutor(singleThreadExecutorService); + } + + public DefaultSingleThreadExecutor(String poolName, int maxPendingTasks) { + this.singleThreadExecutor = createSingleThreadExecutor(poolName, maxPendingTasks); + } + + @Override + public void execute(final Runnable command) { + this.singleThreadExecutor.execute(command); + } + + @Override + public boolean shutdownGracefully() { + return this.singleThreadExecutor.shutdownGracefully(); + } + + @Override + public boolean shutdownGracefully(final long timeout, final TimeUnit unit) { + return this.singleThreadExecutor.shutdownGracefully(timeout, unit); + } + + private static SingleThreadExecutor toSingleThreadExecutor(final ExecutorService executor) { + if (executor instanceof SingleThreadExecutor) { + return (SingleThreadExecutor) executor; + } else { + return new SingleThreadExecutor() { + + @Override + public boolean shutdownGracefully() { + return ExecutorServiceHelper.shutdownAndAwaitTermination(executor); + } + + @Override + public boolean shutdownGracefully(final long timeout, final TimeUnit unit) { + return ExecutorServiceHelper.shutdownAndAwaitTermination(executor, unit.toMillis(timeout)); + } + + @Override + public void execute(final Runnable command) { + executor.execute(command); + } + }; + } + } + + private static SingleThreadExecutor createSingleThreadExecutor(final String poolName, final int maxPendingTasks) { + final ExecutorService singleThreadPool = ThreadPoolUtil.newBuilder() // + .poolName(poolName) // + .enableMetric(true) // + .coreThreads(1) // + .maximumThreads(1) // + .keepAliveSeconds(60L) // + .workQueue(new LinkedBlockingQueue<>(maxPendingTasks)) // + .threadFactory(new NamedThreadFactory(poolName, true)) // + .build(); + + return new SingleThreadExecutor() { + + @Override + public boolean shutdownGracefully() { + return ExecutorServiceHelper.shutdownAndAwaitTermination(singleThreadPool); + } + + @Override + public boolean shutdownGracefully(final long timeout, final TimeUnit unit) { + return ExecutorServiceHelper.shutdownAndAwaitTermination(singleThreadPool, unit.toMillis(timeout)); + } + + @Override + public void execute(final Runnable command) { + singleThreadPool.execute(command); + } + }; + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/ExecutorChooserFactory.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/ExecutorChooserFactory.java new file mode 100644 index 000000000..8058d5129 --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/ExecutorChooserFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util.concurrent; + +/** + * Factory that creates new {@link ExecutorChooser}s. + * + * @author jiachun.fjc + */ +public interface ExecutorChooserFactory { + + /** + * Returns a new {@link ExecutorChooser}. + */ + ExecutorChooser newChooser(final SingleThreadExecutor[] executors); + + interface ExecutorChooser { + + /** + * Returns the next {@link SingleThreadExecutor} to use. + */ + SingleThreadExecutor next(); + + /** + * Returns the chosen {@link SingleThreadExecutor} to use. + */ + SingleThreadExecutor select(final int index); + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/FixedThreadsExecutorGroup.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/FixedThreadsExecutorGroup.java new file mode 100644 index 000000000..e43228912 --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/FixedThreadsExecutorGroup.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util.concurrent; + +import java.util.concurrent.TimeUnit; + +/** + * + * @author jiachun.fjc + */ +public interface FixedThreadsExecutorGroup extends Iterable { + + /** + * Returns one of the {@link SingleThreadExecutor}s managed by this + * {@link FixedThreadsExecutorGroup}. + */ + SingleThreadExecutor next(); + + /** + * Executes the given command at some time in the future. The command + * execute by a specified thread, which is selected by index. + * + * @param index index for thread chooser + * @param command the runnable task + */ + void execute(final int index, final Runnable command); + + /** + * Shortcut method for {@link #shutdownGracefully(long, TimeUnit)} with + * sensible default values. + * + * @return true if success to shutdown + */ + boolean shutdownGracefully(); + + /** + * Signals all executors that the caller wants them to be shutdown. + * + * @param timeout the maximum amount of time to wait until the executor + * is shutdown + * @param unit the unit of {@code timeout} + * @return true if success to shutdown + */ + boolean shutdownGracefully(final long timeout, final TimeUnit unit); +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/FixedThreadsExecutorGroupFactory.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/FixedThreadsExecutorGroupFactory.java new file mode 100644 index 000000000..ac22126f6 --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/FixedThreadsExecutorGroupFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util.concurrent; + +import java.util.concurrent.ExecutorService; + +/** + * + * @author jiachun.fjc + */ +public interface FixedThreadsExecutorGroupFactory { + + FixedThreadsExecutorGroup newExecutorGroup(final int nThreads, final String poolName, + final int maxPendingTasksPerThread); + + FixedThreadsExecutorGroup newExecutorGroup(final SingleThreadExecutor[] children); + + FixedThreadsExecutorGroup newExecutorGroup(final SingleThreadExecutor[] children, + final ExecutorChooserFactory.ExecutorChooser chooser); + + FixedThreadsExecutorGroup newExecutorGroup(final ExecutorService[] children); + + FixedThreadsExecutorGroup newExecutorGroup(final ExecutorService[] children, + final ExecutorChooserFactory.ExecutorChooser chooser); + +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/SingleThreadExecutor.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/SingleThreadExecutor.java new file mode 100644 index 000000000..fc22fb1fa --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/concurrent/SingleThreadExecutor.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.util.concurrent; + +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +/** + * + * @author jiachun.fjc + */ +public interface SingleThreadExecutor extends Executor { + + /** + * Shortcut method for {@link #shutdownGracefully(long, TimeUnit)} with + * sensible default values. + * @return true if success to shutdown + */ + boolean shutdownGracefully(); + + /** + * Signals this executor that the caller wants it to be shutdown. + * + * @param timeout the maximum amount of time to wait until the executor + * is shutdown + * @param unit the unit of {@code timeout} + * @return true if success to shutdown + */ + boolean shutdownGracefully(final long timeout, final TimeUnit unit); +} diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/RouteTableTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/RouteTableTest.java index c3a7d9406..0367ea64a 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/RouteTableTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/RouteTableTest.java @@ -16,13 +16,6 @@ */ package com.alipay.sofa.jraft; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - import java.io.File; import java.util.ArrayList; import java.util.HashSet; @@ -43,6 +36,13 @@ import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService; import com.alipay.sofa.jraft.test.TestUtils; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + public class RouteTableTest { static final Logger LOG = LoggerFactory.getLogger(RouteTableTest.class); diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java index 10fdf5584..7fa4bf286 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java @@ -16,16 +16,6 @@ */ package com.alipay.sofa.jraft.core; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.io.File; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -75,6 +65,16 @@ import com.alipay.sofa.jraft.util.Utils; import com.codahale.metrics.ConsoleReporter; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class NodeTest { static final Logger LOG = LoggerFactory.getLogger(NodeTest.class); diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReadOnlyServiceTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReadOnlyServiceTest.java index 1f33cf381..e89f5a671 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReadOnlyServiceTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/ReadOnlyServiceTest.java @@ -16,12 +16,6 @@ */ package com.alipay.sofa.jraft.core; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - import java.util.ArrayList; import java.util.Arrays; import java.util.concurrent.CountDownLatch; @@ -51,6 +45,12 @@ import com.alipay.sofa.jraft.util.Bytes; import com.alipay.sofa.jraft.util.Utils; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + @RunWith(MockitoJUnitRunner.class) public class ReadOnlyServiceTest { diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/LogEntryTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/LogEntryTest.java index 3059d02f0..5f1a33dad 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/LogEntryTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/LogEntryTest.java @@ -16,13 +16,6 @@ */ package com.alipay.sofa.jraft.entity; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - import java.nio.ByteBuffer; import java.util.Arrays; @@ -31,6 +24,13 @@ import com.alipay.sofa.jraft.entity.codec.v1.LogEntryV1CodecFactory; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + public class LogEntryTest { @SuppressWarnings("deprecation") diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/LogIdTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/LogIdTest.java index 5a6780b0a..fcee0a254 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/LogIdTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/LogIdTest.java @@ -16,11 +16,11 @@ */ package com.alipay.sofa.jraft.entity; +import org.junit.Test; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import org.junit.Test; - public class LogIdTest { @Test diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/PeerIdTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/PeerIdTest.java index 28c5ad642..2f51fd889 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/PeerIdTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/PeerIdTest.java @@ -16,12 +16,12 @@ */ package com.alipay.sofa.jraft.entity; +import org.junit.Test; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import org.junit.Test; - public class PeerIdTest { @Test diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/codec/BaseLogEntryCodecFactoryTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/codec/BaseLogEntryCodecFactoryTest.java index ad95f03cb..8aec68ce5 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/codec/BaseLogEntryCodecFactoryTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/codec/BaseLogEntryCodecFactoryTest.java @@ -16,12 +16,6 @@ */ package com.alipay.sofa.jraft.entity.codec; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.nio.ByteBuffer; import java.util.Arrays; @@ -34,6 +28,12 @@ import com.alipay.sofa.jraft.entity.LogId; import com.alipay.sofa.jraft.entity.PeerId; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public abstract class BaseLogEntryCodecFactoryTest { protected LogEntryEncoder encoder; diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/codec/LogEntryCodecPerfTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/codec/LogEntryCodecPerfTest.java index 26323f9ad..d97e51e65 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/codec/LogEntryCodecPerfTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/codec/LogEntryCodecPerfTest.java @@ -16,16 +16,14 @@ */ package com.alipay.sofa.jraft.entity.codec; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - import java.nio.ByteBuffer; import java.util.Arrays; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicLong; +import io.netty.util.internal.ThreadLocalRandom; + import org.junit.Before; import org.junit.Test; @@ -38,7 +36,9 @@ import com.alipay.sofa.jraft.entity.codec.v2.V2Encoder; import com.alipay.sofa.jraft.util.Utils; -import io.netty.util.internal.ThreadLocalRandom; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; public class LogEntryCodecPerfTest { diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/codec/v2/LogEntryV2CodecFactoryTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/codec/v2/LogEntryV2CodecFactoryTest.java index 03f3d728d..547865a68 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/codec/v2/LogEntryV2CodecFactoryTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/entity/codec/v2/LogEntryV2CodecFactoryTest.java @@ -16,11 +16,6 @@ */ package com.alipay.sofa.jraft.entity.codec.v2; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - import java.nio.ByteBuffer; import java.util.Arrays; @@ -34,6 +29,11 @@ import com.alipay.sofa.jraft.entity.codec.LogEntryCodecFactory; import com.alipay.sofa.jraft.entity.codec.v1.V1Encoder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + public class LogEntryV2CodecFactoryTest extends BaseLogEntryCodecFactoryTest { @Override diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/AbstractBoltClientServiceTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/AbstractBoltClientServiceTest.java index 5ce216014..541725b32 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/AbstractBoltClientServiceTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/AbstractBoltClientServiceTest.java @@ -16,15 +16,6 @@ */ package com.alipay.sofa.jraft.rpc; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.eq; - import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -54,6 +45,15 @@ import com.alipay.sofa.jraft.util.Endpoint; import com.google.protobuf.Message; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.eq; + @RunWith(value = MockitoJUnitRunner.class) public class AbstractBoltClientServiceTest { static class MockBoltClientService extends AbstractBoltClientService { diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/impl/core/AppendEntriesRequestProcessorTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/impl/core/AppendEntriesRequestProcessorTest.java index 77c41304a..0166ccfba 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/impl/core/AppendEntriesRequestProcessorTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/rpc/impl/core/AppendEntriesRequestProcessorTest.java @@ -93,7 +93,7 @@ public void verify(String interest, RaftServerService service, NodeRequestProces } @Test - public void testGetPeerRequsetContextRemovePeerRequestContext() { + public void testGetPeerRequestContextRemovePeerRequestContext() { mockNode(); final AppendEntriesRequestProcessor processor = (AppendEntriesRequestProcessor) newProcessor(); diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/SnapshotExecutorTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/SnapshotExecutorTest.java index d500f1828..9e168f062 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/SnapshotExecutorTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/SnapshotExecutorTest.java @@ -16,11 +16,6 @@ */ package com.alipay.sofa.jraft.storage; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.eq; - import java.nio.ByteBuffer; import org.junit.After; @@ -65,6 +60,11 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Message; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; + @RunWith(value = MockitoJUnitRunner.class) public class SnapshotExecutorTest extends BaseStorageTest { private SnapshotExecutorImpl executor; diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LocalRaftMetaStorageTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LocalRaftMetaStorageTest.java index 8cfcba2ba..fe52645ef 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LocalRaftMetaStorageTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LocalRaftMetaStorageTest.java @@ -16,10 +16,6 @@ */ package com.alipay.sofa.jraft.storage.impl; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - import java.io.File; import java.io.IOException; @@ -40,6 +36,10 @@ import com.alipay.sofa.jraft.storage.BaseStorageTest; import com.alipay.sofa.jraft.storage.RaftMetaStorage; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + /** * * @author boyan (boyan@alibaba-inc.com) diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LogManagerTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LogManagerTest.java index f013dd96a..07c2d3e6c 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LogManagerTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LogManagerTest.java @@ -16,13 +16,6 @@ */ package com.alipay.sofa.jraft.storage.impl; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -53,6 +46,13 @@ import com.alipay.sofa.jraft.storage.LogStorage; import com.alipay.sofa.jraft.test.TestUtils; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + @RunWith(value = MockitoJUnitRunner.class) public class LogManagerTest extends BaseStorageTest { private LogManagerImpl logManager; diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorageTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorageTest.java index b6e30f19e..d162697e0 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorageTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorageTest.java @@ -16,12 +16,6 @@ */ package com.alipay.sofa.jraft.storage.impl; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - import java.util.Arrays; import java.util.List; @@ -44,6 +38,12 @@ import com.alipay.sofa.jraft.storage.LogStorage; import com.alipay.sofa.jraft.test.TestUtils; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + public class RocksDBLogStorageTest extends BaseStorageTest { private LogStorage logStorage; private ConfigurationManager confManager; diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java index 174390a60..9504d936b 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java @@ -16,12 +16,6 @@ */ package com.alipay.sofa.jraft.storage.snapshot.local; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.eq; - import java.nio.ByteBuffer; import org.junit.After; @@ -55,6 +49,12 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Message; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; + @RunWith(value = MockitoJUnitRunner.class) public class LocalSnapshotCopierTest extends BaseStorageTest { private LocalSnapshotCopier copier; diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/util/CrcUtilTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/util/CrcUtilTest.java index e1f5a91a7..8af05f181 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/util/CrcUtilTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/util/CrcUtilTest.java @@ -16,10 +16,10 @@ */ package com.alipay.sofa.jraft.util; -import static org.junit.Assert.assertEquals; - import org.junit.Test; +import static org.junit.Assert.assertEquals; + public class CrcUtilTest { @Test diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/util/EndpointTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/util/EndpointTest.java index dea545034..4072624e2 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/util/EndpointTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/util/EndpointTest.java @@ -16,10 +16,10 @@ */ package com.alipay.sofa.jraft.util; -import static org.junit.Assert.assertEquals; - import org.junit.Test; +import static org.junit.Assert.assertEquals; + public class EndpointTest { @Test diff --git a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/rheakv/DistributedLockExample.java b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/rheakv/DistributedLockExample.java index f713f9f9f..9b7983b76 100644 --- a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/rheakv/DistributedLockExample.java +++ b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/rheakv/DistributedLockExample.java @@ -24,8 +24,8 @@ import org.slf4j.LoggerFactory; import com.alipay.sofa.jraft.rhea.client.RheaKVStore; -import com.alipay.sofa.jraft.rhea.util.ExecutorServiceHelper; import com.alipay.sofa.jraft.rhea.util.concurrent.DistributedLock; +import com.alipay.sofa.jraft.util.ExecutorServiceHelper; /** * diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/StoreEngine.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/StoreEngine.java index 5f42318bc..ed8835437 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/StoreEngine.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/StoreEngine.java @@ -58,7 +58,6 @@ import com.alipay.sofa.jraft.rhea.storage.RocksRawKVStore; import com.alipay.sofa.jraft.rhea.storage.StorageType; import com.alipay.sofa.jraft.rhea.util.Constants; -import com.alipay.sofa.jraft.rhea.util.ExecutorServiceHelper; import com.alipay.sofa.jraft.rhea.util.Lists; import com.alipay.sofa.jraft.rhea.util.Maps; import com.alipay.sofa.jraft.rhea.util.NetUtil; @@ -66,6 +65,7 @@ import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; import com.alipay.sofa.jraft.util.BytesUtil; import com.alipay.sofa.jraft.util.Endpoint; +import com.alipay.sofa.jraft.util.ExecutorServiceHelper; import com.alipay.sofa.jraft.util.MetricThreadPoolExecutor; import com.alipay.sofa.jraft.util.Requires; import com.codahale.metrics.ScheduledReporter; diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVRpcService.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVRpcService.java index c54ef89e2..4ac71e75f 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVRpcService.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVRpcService.java @@ -37,10 +37,10 @@ import com.alipay.sofa.jraft.rhea.errors.ErrorsHelper; import com.alipay.sofa.jraft.rhea.options.RpcOptions; import com.alipay.sofa.jraft.rhea.rpc.ExtSerializerSupports; -import com.alipay.sofa.jraft.rhea.util.ExecutorServiceHelper; import com.alipay.sofa.jraft.rhea.util.concurrent.CallerRunsPolicyWithReport; import com.alipay.sofa.jraft.rhea.util.concurrent.NamedThreadFactory; import com.alipay.sofa.jraft.util.Endpoint; +import com.alipay.sofa.jraft.util.ExecutorServiceHelper; import com.alipay.sofa.jraft.util.Requires; import com.alipay.sofa.jraft.util.ThreadPoolUtil; diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/pd/DefaultPlacementDriverRpcService.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/pd/DefaultPlacementDriverRpcService.java index 010ef1887..b8bba096c 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/pd/DefaultPlacementDriverRpcService.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/pd/DefaultPlacementDriverRpcService.java @@ -35,10 +35,10 @@ import com.alipay.sofa.jraft.rhea.errors.ErrorsHelper; import com.alipay.sofa.jraft.rhea.options.RpcOptions; import com.alipay.sofa.jraft.rhea.rpc.ExtSerializerSupports; -import com.alipay.sofa.jraft.rhea.util.ExecutorServiceHelper; import com.alipay.sofa.jraft.rhea.util.concurrent.CallerRunsPolicyWithReport; import com.alipay.sofa.jraft.rhea.util.concurrent.NamedThreadFactory; import com.alipay.sofa.jraft.util.Endpoint; +import com.alipay.sofa.jraft.util.ExecutorServiceHelper; import com.alipay.sofa.jraft.util.Requires; import com.alipay.sofa.jraft.util.ThreadPoolUtil; diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/pd/HeartbeatSender.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/pd/HeartbeatSender.java index 98b2c4074..aeec4a5f7 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/pd/HeartbeatSender.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/pd/HeartbeatSender.java @@ -44,13 +44,13 @@ import com.alipay.sofa.jraft.rhea.options.HeartbeatOptions; import com.alipay.sofa.jraft.rhea.rpc.ExtSerializerSupports; import com.alipay.sofa.jraft.rhea.storage.BaseKVStoreClosure; -import com.alipay.sofa.jraft.rhea.util.ExecutorServiceHelper; import com.alipay.sofa.jraft.rhea.util.Lists; import com.alipay.sofa.jraft.rhea.util.Pair; import com.alipay.sofa.jraft.rhea.util.StackTraceUtil; import com.alipay.sofa.jraft.rhea.util.concurrent.DiscardOldPolicyWithReport; import com.alipay.sofa.jraft.rhea.util.concurrent.NamedThreadFactory; import com.alipay.sofa.jraft.util.Endpoint; +import com.alipay.sofa.jraft.util.ExecutorServiceHelper; import com.alipay.sofa.jraft.util.ThreadPoolUtil; import com.alipay.sofa.jraft.util.timer.HashedWheelTimer; import com.alipay.sofa.jraft.util.timer.Timeout; diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MetricsRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MetricsRawKVStore.java index 41ff76a20..4dc648754 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MetricsRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MetricsRawKVStore.java @@ -23,7 +23,6 @@ import com.codahale.metrics.Timer; import static com.alipay.sofa.jraft.rhea.metrics.KVMetricNames.RPC_REQUEST_HANDLE_TIMER; -import static com.alipay.sofa.jraft.rhea.storage.KVOperation.PUT_LIST; import static com.alipay.sofa.jraft.rhea.storage.KVOperation.DELETE; import static com.alipay.sofa.jraft.rhea.storage.KVOperation.DELETE_RANGE; import static com.alipay.sofa.jraft.rhea.storage.KVOperation.GET; @@ -36,6 +35,7 @@ import static com.alipay.sofa.jraft.rhea.storage.KVOperation.NODE_EXECUTE; import static com.alipay.sofa.jraft.rhea.storage.KVOperation.PUT; import static com.alipay.sofa.jraft.rhea.storage.KVOperation.PUT_IF_ABSENT; +import static com.alipay.sofa.jraft.rhea.storage.KVOperation.PUT_LIST; import static com.alipay.sofa.jraft.rhea.storage.KVOperation.RESET_SEQUENCE; import static com.alipay.sofa.jraft.rhea.storage.KVOperation.SCAN; diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/ByteObjectHashMap.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/ByteObjectHashMap.java index cf7b17b73..ec9d12896 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/ByteObjectHashMap.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/ByteObjectHashMap.java @@ -23,6 +23,8 @@ import java.util.NoSuchElementException; import java.util.Set; +import com.alipay.sofa.jraft.util.Ints; + /** * A hash map implementation of {@link ByteObjectMap} that uses open addressing for keys. * To minimize the memory footprint, this class uses open addressing rather than chaining. diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/Lists.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/Lists.java index ce2c0f270..2d195a989 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/Lists.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/Lists.java @@ -28,6 +28,7 @@ import java.util.RandomAccess; import java.util.function.Function; +import com.alipay.sofa.jraft.util.Ints; import com.alipay.sofa.jraft.util.Requires; /** diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/Maps.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/Maps.java index 0a2b9ae27..5760f593c 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/Maps.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/Maps.java @@ -26,6 +26,7 @@ import com.alipay.sofa.jraft.rhea.util.concurrent.collection.NonBlockingHashMap; import com.alipay.sofa.jraft.rhea.util.concurrent.collection.NonBlockingHashMapLong; +import com.alipay.sofa.jraft.util.Ints; import com.alipay.sofa.jraft.util.Requires; import com.alipay.sofa.jraft.util.SystemPropertyUtil; import com.alipay.sofa.jraft.util.internal.UnsafeUtil; diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/concurrent/disruptor/TaskDispatcher.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/concurrent/disruptor/TaskDispatcher.java index e4a7ae6d4..64b3ec89a 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/concurrent/disruptor/TaskDispatcher.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/concurrent/disruptor/TaskDispatcher.java @@ -19,7 +19,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import com.alipay.sofa.jraft.rhea.util.Ints; +import com.alipay.sofa.jraft.util.Ints; import com.alipay.sofa.jraft.util.Requires; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.BusySpinWaitStrategy; diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/pipeline/DefaultHandlerInvoker.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/pipeline/DefaultHandlerInvoker.java index 44ae9fba0..ff79d7e87 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/pipeline/DefaultHandlerInvoker.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/pipeline/DefaultHandlerInvoker.java @@ -21,11 +21,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.alipay.sofa.jraft.rhea.util.ExecutorServiceHelper; import com.alipay.sofa.jraft.rhea.util.StackTraceUtil; import com.alipay.sofa.jraft.rhea.util.pipeline.event.InboundMessageEvent; import com.alipay.sofa.jraft.rhea.util.pipeline.event.MessageEvent; import com.alipay.sofa.jraft.rhea.util.pipeline.event.OutboundMessageEvent; +import com.alipay.sofa.jraft.util.ExecutorServiceHelper; /** * diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/chaos/AbstractChaosTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/chaos/AbstractChaosTest.java index afa5319a9..4e3df1b3d 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/chaos/AbstractChaosTest.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/chaos/AbstractChaosTest.java @@ -39,8 +39,8 @@ import com.alipay.sofa.jraft.rhea.client.RheaKVStore; import com.alipay.sofa.jraft.rhea.storage.StorageType; import com.alipay.sofa.jraft.rhea.util.Constants; -import com.alipay.sofa.jraft.rhea.util.ExecutorServiceHelper; import com.alipay.sofa.jraft.util.BytesUtil; +import com.alipay.sofa.jraft.util.ExecutorServiceHelper; import com.alipay.sofa.jraft.util.NamedThreadFactory; /** diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/AbstractDistributedLockTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/AbstractDistributedLockTest.java index 7568e5789..419ddb9ba 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/AbstractDistributedLockTest.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/AbstractDistributedLockTest.java @@ -30,9 +30,9 @@ import com.alipay.sofa.jraft.rhea.client.RheaKVStore; import com.alipay.sofa.jraft.rhea.errors.InvalidLockAcquirerException; import com.alipay.sofa.jraft.rhea.storage.StorageType; -import com.alipay.sofa.jraft.rhea.util.ExecutorServiceHelper; import com.alipay.sofa.jraft.rhea.util.concurrent.DistributedLock; import com.alipay.sofa.jraft.util.BytesUtil; +import com.alipay.sofa.jraft.util.ExecutorServiceHelper; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; diff --git a/jraft-rheakv/rheakv-pd/src/main/java/com/alipay/sofa/jraft/rhea/PlacementDriverServer.java b/jraft-rheakv/rheakv-pd/src/main/java/com/alipay/sofa/jraft/rhea/PlacementDriverServer.java index 2ea4585c6..5c61aedd7 100644 --- a/jraft-rheakv/rheakv-pd/src/main/java/com/alipay/sofa/jraft/rhea/PlacementDriverServer.java +++ b/jraft-rheakv/rheakv-pd/src/main/java/com/alipay/sofa/jraft/rhea/PlacementDriverServer.java @@ -39,10 +39,10 @@ import com.alipay.sofa.jraft.rhea.options.PlacementDriverServerOptions; import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions; import com.alipay.sofa.jraft.rhea.util.Constants; -import com.alipay.sofa.jraft.rhea.util.ExecutorServiceHelper; import com.alipay.sofa.jraft.rhea.util.concurrent.CallerRunsPolicyWithReport; import com.alipay.sofa.jraft.rhea.util.concurrent.NamedThreadFactory; import com.alipay.sofa.jraft.util.Endpoint; +import com.alipay.sofa.jraft.util.ExecutorServiceHelper; import com.alipay.sofa.jraft.util.Requires; import com.alipay.sofa.jraft.util.ThreadPoolUtil;