From b6571da6de35ea0326395aa6124dafd3915d4cb1 Mon Sep 17 00:00:00 2001 From: wxm <115806199+youfanx@users.noreply.github.com> Date: Fri, 30 Aug 2024 11:02:08 +0800 Subject: [PATCH] dynamicSizeByCpuLoad --- .../main/java/org/rx/core/CpuWatchman.java | 46 +++++++++++-------- rxlib/src/main/java/org/rx/core/RxConfig.java | 3 ++ .../src/main/java/org/rx/core/ThreadPool.java | 17 ++----- rxlib/src/main/resources/rx.yml | 1 + 4 files changed, 37 insertions(+), 30 deletions(-) diff --git a/rxlib/src/main/java/org/rx/core/CpuWatchman.java b/rxlib/src/main/java/org/rx/core/CpuWatchman.java index 941590e4..918fe5ea 100644 --- a/rxlib/src/main/java/org/rx/core/CpuWatchman.java +++ b/rxlib/src/main/java/org/rx/core/CpuWatchman.java @@ -6,6 +6,7 @@ import io.netty.util.Timeout; import io.netty.util.TimerTask; import lombok.Getter; +import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.rx.bean.*; @@ -157,7 +158,7 @@ static int decrSize(ThreadPoolExecutor pool) { return poolSize; } - final Map> holder = new WeakIdentityMap<>(8); + final Map> holder = new WeakIdentityMap<>(8); private CpuWatchman() { timer.newTimeout(this, RxConfig.INSTANCE.threadPool.samplingPeriod, TimeUnit.MILLISECONDS); @@ -165,9 +166,10 @@ private CpuWatchman() { @Override public void run(Timeout timeout) throws Exception { + RxConfig.ThreadPoolConfig conf = RxConfig.INSTANCE.threadPool; try { - Decimal cpuLoad = Decimal.valueOf(osMx.getSystemCpuLoad() * 100); - for (Map.Entry> entry : holder.entrySet()) { + Decimal cpuLoad = Decimal.valueOf(conf.watchSystemCpu ? osMx.getSystemCpuLoad() : osMx.getProcessCpuLoad() * 100); + for (Map.Entry> entry : holder.entrySet()) { ThreadPoolExecutor pool = entry.getKey(); if (pool instanceof ScheduledExecutorService) { scheduledThread(cpuLoad, pool, entry.getValue()); @@ -176,14 +178,15 @@ public void run(Timeout timeout) throws Exception { thread(cpuLoad, pool, entry.getValue()); } } finally { - timer.newTimeout(this, RxConfig.INSTANCE.threadPool.samplingPeriod, TimeUnit.MILLISECONDS); + timer.newTimeout(this, conf.samplingPeriod, TimeUnit.MILLISECONDS); } } - private void thread(Decimal cpuLoad, ThreadPoolExecutor pool, BiTuple tuple) { + private void thread(Decimal cpuLoad, ThreadPoolExecutor pool, Tuple tuple) { IntWaterMark waterMark = tuple.left; - int decrementCounter = tuple.middle; - int incrementCounter = tuple.right; + int[] counter = tuple.right; + int decrementCounter = counter[0]; + int incrementCounter = counter[1]; String prefix = pool.toString(); if (log.isDebugEnabled()) { @@ -214,14 +217,15 @@ private void thread(Decimal cpuLoad, ThreadPoolExecutor pool, BiTuple tuple) { + private void scheduledThread(Decimal cpuLoad, ThreadPoolExecutor pool, Tuple tuple) { IntWaterMark waterMark = tuple.left; - int decrementCounter = tuple.middle; - int incrementCounter = tuple.right; + int[] counter = tuple.right; + int decrementCounter = counter[0]; + int incrementCounter = counter[1]; String prefix = pool.toString(); int active = pool.getActiveCount(); @@ -252,15 +256,21 @@ private void scheduledThread(Decimal cpuLoad, ThreadPoolExecutor pool, BiTuple 100) { + waterMark.setHigh(100); } + holder.put(pool, Tuple.of(waterMark, new int[2])); + } - holder.put(pool, BiTuple.of(cpuWaterMark, 0, 0)); + public void unregister(@NonNull ThreadPoolExecutor pool) { + holder.remove(pool); } } diff --git a/rxlib/src/main/java/org/rx/core/RxConfig.java b/rxlib/src/main/java/org/rx/core/RxConfig.java index 7e199687..aa86adb4 100644 --- a/rxlib/src/main/java/org/rx/core/RxConfig.java +++ b/rxlib/src/main/java/org/rx/core/RxConfig.java @@ -37,6 +37,7 @@ public interface ConfigNames { String THREAD_POOL_QUEUE_CAPACITY = "app.threadPool.queueCapacity"; String THREAD_POOL_LOW_CPU_WATER_MARK = "app.threadPool.lowCpuWaterMark"; String THREAD_POOL_HIGH_CPU_WATER_MARK = "app.threadPool.highCpuWaterMark"; + String THREAD_POOL_WATCH_SYSTEM_CPU = "app.threadPool.watchSystemCpu"; String THREAD_POOL_REPLICAS = "app.threadPool.replicas"; String THREAD_POOL_TRACE_NAME = "app.threadPool.traceName"; String THREAD_POOL_SLOW_METHOD_SAMPLING_PERCENT = "app.threadPool.slowMethodSamplingPercent"; @@ -109,6 +110,7 @@ public static class ThreadPoolConfig { int queueCapacity; int lowCpuWaterMark; int highCpuWaterMark; + boolean watchSystemCpu; int replicas; String traceName; int maxTraceDepth; @@ -286,6 +288,7 @@ public void refreshFromSystemProperty() { threadPool.queueCapacity = SystemPropertyUtil.getInt(ConfigNames.THREAD_POOL_QUEUE_CAPACITY, threadPool.queueCapacity); threadPool.lowCpuWaterMark = SystemPropertyUtil.getInt(ConfigNames.THREAD_POOL_LOW_CPU_WATER_MARK, threadPool.lowCpuWaterMark); threadPool.highCpuWaterMark = SystemPropertyUtil.getInt(ConfigNames.THREAD_POOL_HIGH_CPU_WATER_MARK, threadPool.highCpuWaterMark); + threadPool.watchSystemCpu = SystemPropertyUtil.getBoolean(ConfigNames.THREAD_POOL_WATCH_SYSTEM_CPU, threadPool.watchSystemCpu); threadPool.replicas = SystemPropertyUtil.getInt(ConfigNames.THREAD_POOL_REPLICAS, threadPool.replicas); threadPool.traceName = SystemPropertyUtil.get(ConfigNames.THREAD_POOL_TRACE_NAME); threadPool.maxTraceDepth = SystemPropertyUtil.getInt(ConfigNames.THREAD_POOL_MAX_TRACE_DEPTH, threadPool.maxTraceDepth); diff --git a/rxlib/src/main/java/org/rx/core/ThreadPool.java b/rxlib/src/main/java/org/rx/core/ThreadPool.java index 949b0e6f..55a07972 100644 --- a/rxlib/src/main/java/org/rx/core/ThreadPool.java +++ b/rxlib/src/main/java/org/rx/core/ThreadPool.java @@ -392,11 +392,7 @@ public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { public ThreadPool(String poolName) { //computeThreads(1, 2, 1) - this(RxConfig.INSTANCE.threadPool.initSize, RxConfig.INSTANCE.threadPool.queueCapacity, poolName); - } - - public ThreadPool(int initSize, int queueCapacity, String poolName) { - this(initSize, queueCapacity, DEFAULT_CPU_WATER_MARK, poolName); + this(RxConfig.INSTANCE.threadPool.initSize, RxConfig.INSTANCE.threadPool.queueCapacity, null, poolName); } /** @@ -418,7 +414,7 @@ RxConfig.INSTANCE.threadPool.keepAliveSeconds, TimeUnit.SECONDS, new ThreadQueue ((ThreadQueue) super.getQueue()).pool = this; this.poolName = poolName; - setDynamicSize(cpuWaterMark); + dynamicSizeByCpuLoad(cpuWaterMark); } private static int checkSize(int size) { @@ -436,12 +432,9 @@ private static int checkCapacity(int capacity) { return capacity; } - public void setDynamicSize(IntWaterMark cpuWaterMark) { - if (cpuWaterMark.getLow() < 0) { - cpuWaterMark.setLow(0); - } - if (cpuWaterMark.getHigh() > 100) { - cpuWaterMark.setHigh(100); + public void dynamicSizeByCpuLoad(IntWaterMark cpuWaterMark) { + if (cpuWaterMark == null) { + cpuWaterMark = DEFAULT_CPU_WATER_MARK; } CpuWatchman.INSTANCE.register(this, cpuWaterMark); } diff --git a/rxlib/src/main/resources/rx.yml b/rxlib/src/main/resources/rx.yml index 1bf76c04..0723af97 100644 --- a/rxlib/src/main/resources/rx.yml +++ b/rxlib/src/main/resources/rx.yml @@ -13,6 +13,7 @@ app: queueCapacity: 0 lowCpuWaterMark: 40 highCpuWaterMark: 70 + watchSystemCpu: false replicas: 2 maxTraceDepth: 5 slowMethodSamplingPercent: 2