Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat) configure the number of processors #180 #181

Merged
merged 1 commit into from
Jun 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,12 @@ public class Utils {
private static final Logger LOG = LoggerFactory.getLogger(Utils.class);

/**
* Current system CPUs count.
* The configured number of available processors. The default is {@link Runtime#availableProcessors()}.
* This can be overridden by setting the system property "jraft.available_processors".
*/
private static final int CPUS = Runtime.getRuntime().availableProcessors();
private static final int CPUS = SystemPropertyUtil.getInt(
"jraft.available_processors", Runtime
.getRuntime().availableProcessors());

/**
* Default jraft closure executor pool minimum size, CPUs by default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.LogExceptionHandler;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.Utils;
import com.codahale.metrics.Histogram;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
Expand Down Expand Up @@ -257,7 +258,7 @@ public Endpoint getLeader(final long regionId, final boolean forceRefresh, final
this.futureTimeoutMillis = opts.getFutureTimeoutMillis();
this.onlyLeaderRead = opts.isOnlyLeaderRead();
if (opts.isUseParallelKVExecutor()) {
final int numWorkers = Constants.AVAILABLE_PROCESSORS;
final int numWorkers = Utils.cpus();
final int bufSize = numWorkers << 4;
final String name = "parallel-kv-executor";
final ThreadFactory threadFactory = Constants.THREAD_AFFINITY_ENABLED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
*/
package com.alipay.sofa.jraft.rhea.options;

import com.alipay.sofa.jraft.rhea.util.Constants;
import com.alipay.sofa.jraft.util.Utils;

/**
*
* @author jiachun.fjc
*/
public class RpcOptions {

private int callbackExecutorCorePoolSize = Constants.AVAILABLE_PROCESSORS << 2;
private int callbackExecutorMaximumPoolSize = Constants.AVAILABLE_PROCESSORS << 3;
private int callbackExecutorCorePoolSize = Utils.cpus() << 2;
private int callbackExecutorMaximumPoolSize = Utils.cpus() << 3;
private int callbackExecutorQueueCapacity = 512;
private int rpcTimeoutMillis = 5000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rhea.storage.StorageType;
import com.alipay.sofa.jraft.rhea.util.Constants;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.Utils;

/**
*
Expand All @@ -43,12 +43,12 @@ public class StoreEngineOptions {
private HeartbeatOptions heartbeatOptions;
private boolean useSharedRpcExecutor;
// thread poll number of threads
private int readIndexCoreThreads = Math.max(Constants.AVAILABLE_PROCESSORS << 2, 16);
private int readIndexCoreThreads = Math.max(Utils.cpus() << 2, 16);
private int leaderStateTriggerCoreThreads = 4;
private int snapshotCoreThreads = 1;
private int cliRpcCoreThreads = Constants.AVAILABLE_PROCESSORS << 2;
private int raftRpcCoreThreads = Math.max(Constants.AVAILABLE_PROCESSORS << 3, 32);
private int kvRpcCoreThreads = Math.max(Constants.AVAILABLE_PROCESSORS << 3, 32);
private int cliRpcCoreThreads = Utils.cpus() << 2;
private int raftRpcCoreThreads = Math.max(Utils.cpus() << 3, 32);
private int kvRpcCoreThreads = Math.max(Utils.cpus() << 3, 32);
// metrics schedule option (seconds), won't start reporter id metricsReportPeriod <= 0
private long metricsReportPeriod = TimeUnit.MINUTES.toSeconds(5);
// the minimum number of keys required to split, less than this value will refuse to split
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ public final class Constants {
// TODO support ipv6
public static final String IP_ANY = "0.0.0.0";

/** CPU cores */
public static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();

public static final boolean THREAD_AFFINITY_ENABLED = SystemPropertyUtil.getBoolean("rhea.thread.affinity.enabled",
false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.ExecutorServiceHelper;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.Utils;

/**
*
* @author jiachun.fjc
*/
public abstract class AbstractChaosTest {

private static final int LOOP_1 = Constants.AVAILABLE_PROCESSORS;
private static final int LOOP_1 = Utils.cpus();
private static final int LOOP_2 = 20;
private static final int INITIAL_PEER_COUNT = 5;
private static final int RETRIES = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@
import com.alipay.sofa.jraft.rhea.cmd.pd.StoreHeartbeatRequest;
import com.alipay.sofa.jraft.rhea.options.PlacementDriverServerOptions;
import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions;
import com.alipay.sofa.jraft.rhea.util.Constants;
import com.alipay.sofa.jraft.rhea.util.concurrent.CallerRunsPolicyWithReport;
import com.alipay.sofa.jraft.rhea.util.concurrent.NamedThreadFactory;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ExecutorServiceHelper;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.alipay.sofa.jraft.util.Utils;

/**
* PlacementDriverServer is a role responsible for overall global control.
Expand Down Expand Up @@ -218,7 +218,7 @@ private void addPlacementDriverProcessor(final RpcServer rpcServer) {
}

private ThreadPoolExecutor createDefaultPdExecutor() {
final int corePoolSize = Math.max(Constants.AVAILABLE_PROCESSORS << 2, 32);
final int corePoolSize = Math.max(Utils.cpus() << 2, 32);
final String name = "rheakv-pd-executor";
return ThreadPoolUtil.newBuilder() //
.poolName(name) //
Expand Down