Skip to content

Commit

Permalink
fix 1072: provides a way to safely set the thread pool size (#1075)
Browse files Browse the repository at this point in the history
  • Loading branch information
jjiey authored Mar 9, 2023
1 parent 63ec2dc commit 2546f04
Show file tree
Hide file tree
Showing 12 changed files with 77 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.store.DataStore;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -79,8 +80,7 @@ public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterPara
}
int originalCoreSize = executor.getCorePoolSize();
int originalMaximumPoolSize = executor.getMaximumPoolSize();
executor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
executor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
ThreadPoolExecutorUtil.safeSetPoolSize(executor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize());
log.info("[{}] Alibaba Dubbo consumption thread pool parameter change. coreSize: {}, maximumSize: {}",
threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, executor.getCorePoolSize()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.extension.ExtensionLoader;
Expand Down Expand Up @@ -83,8 +84,7 @@ public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterPara
}
int originalCoreSize = executor.getCorePoolSize();
int originalMaximumPoolSize = executor.getMaximumPoolSize();
executor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
executor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
ThreadPoolExecutorUtil.safeSetPoolSize(executor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize());
log.info("[{}] Dubbo consumption thread pool parameter change. coreSize: {}, maximumSize: {}",
threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, executor.getCorePoolSize()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import com.netflix.hystrix.HystrixThreadPool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationStartedEvent;
Expand Down Expand Up @@ -96,8 +97,7 @@ public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterPara
}
int originalCoreSize = threadPoolExecutor.getCorePoolSize();
int originalMaximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
threadPoolExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
threadPoolExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
ThreadPoolExecutorUtil.safeSetPoolSize(threadPoolExecutor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize());
log.info("[{}] Hystrix thread pool parameter change. coreSize: {}, maximumSize: {}",
threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolExecutor.getCorePoolSize()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory;
Expand Down Expand Up @@ -84,8 +85,7 @@ public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterPara
if (Objects.nonNull(threadPoolTaskExecutor)) {
int originalCoreSize = threadPoolTaskExecutor.getCorePoolSize();
int originalMaximumPoolSize = threadPoolTaskExecutor.getMaximumPoolSize();
threadPoolTaskExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
threadPoolTaskExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
ThreadPoolExecutorUtil.safeSetPoolSize(threadPoolTaskExecutor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize());
log.info("[{}] Rabbitmq consumption thread pool parameter change. coreSize: {}, maximumSize: {}",
threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolAdapterParameter.getCorePoolSize()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
Expand Down Expand Up @@ -79,8 +80,7 @@ public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterPara
if (rocketMQConsumeExecutor != null) {
int originalCoreSize = rocketMQConsumeExecutor.getCorePoolSize();
int originalMaximumPoolSize = rocketMQConsumeExecutor.getMaximumPoolSize();
rocketMQConsumeExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
rocketMQConsumeExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
ThreadPoolExecutorUtil.safeSetPoolSize(rocketMQConsumeExecutor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize());
log.info("[{}] RocketMQ consumption thread pool parameter change. coreSize: {}, maximumSize: {}",
threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, rocketMQConsumeExecutor.getCorePoolSize()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -87,8 +88,7 @@ public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterPara
if (rocketMQConsumeExecutor != null) {
int originalCoreSize = rocketMQConsumeExecutor.getCorePoolSize();
int originalMaximumPoolSize = rocketMQConsumeExecutor.getMaximumPoolSize();
rocketMQConsumeExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
rocketMQConsumeExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
ThreadPoolExecutorUtil.safeSetPoolSize(rocketMQConsumeExecutor, threadPoolAdapterParameter.getCorePoolSize(), threadPoolAdapterParameter.getMaximumPoolSize());
log.info("[{}] RocketMQ consumption thread pool parameter change. coreSize: {}, maximumSize: {}",
threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, rocketMQConsumeExecutor.getCorePoolSize()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,14 @@ public void updateWebThreadPool(ThreadPoolParameterInfo threadPoolParameterInfo)
int originalCoreSize = tomcatThreadPoolExecutor.getCorePoolSize();
int originalMaximumPoolSize = tomcatThreadPoolExecutor.getMaximumPoolSize();
long originalKeepAliveTime = tomcatThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS);
tomcatThreadPoolExecutor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt());
tomcatThreadPoolExecutor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt());
// see cn.hippo4j.common.toolkit.ThreadPoolUtil#setCoreSizeAndMaximumSize
if (threadPoolParameterInfo.corePoolSizeAdapt() > originalMaximumPoolSize) {
tomcatThreadPoolExecutor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt());
tomcatThreadPoolExecutor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt());
} else {
tomcatThreadPoolExecutor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt());
tomcatThreadPoolExecutor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt());
}
tomcatThreadPoolExecutor.setKeepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS);
log.info("[Tomcat] Changed web thread pool. corePoolSize: {}, maximumPoolSize: {}, keepAliveTime: {}",
String.format(ChangeThreadPoolConstants.CHANGE_DELIMITER, originalCoreSize, threadPoolParameterInfo.corePoolSizeAdapt()),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.hippo4j.common.toolkit;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

import java.util.concurrent.ThreadPoolExecutor;

/**
* Thread pool util
*
* @author yangjie
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ThreadPoolExecutorUtil {

/**
* Set the thread pool size in a safe way.
* <p>
* see https://github.com/opengoofy/hippo4j/issues/1072
*/
public static void safeSetPoolSize(ThreadPoolExecutor executor, int newCorePoolSize, int newMaximumPoolSize) {
Assert.isTrue(newCorePoolSize <= newMaximumPoolSize, "newCorePoolSize must be smaller than newMaximumPoolSize");
int originalMaximumPoolSize = executor.getMaximumPoolSize();
if (newCorePoolSize > originalMaximumPoolSize) {
executor.setMaximumPoolSize(newMaximumPoolSize);
executor.setCorePoolSize(newCorePoolSize);
} else {
executor.setCorePoolSize(newCorePoolSize);
executor.setMaximumPoolSize(newMaximumPoolSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.config.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.config.springboot.starter.notify.ConfigModeNotifyConfigBuilder;
Expand Down Expand Up @@ -240,13 +241,7 @@ private void dynamicRefreshPool(String threadPoolId, ExecutorProperties properti
ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId());
ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor();
if (properties.getMaximumPoolSize() != null && properties.getCorePoolSize() != null) {
if (properties.getMaximumPoolSize() < executor.getMaximumPoolSize()) {
executor.setCorePoolSize(properties.getCorePoolSize());
executor.setMaximumPoolSize(properties.getMaximumPoolSize());
} else {
executor.setMaximumPoolSize(properties.getMaximumPoolSize());
executor.setCorePoolSize(properties.getCorePoolSize());
}
ThreadPoolExecutorUtil.safeSetPoolSize(executor, properties.getCorePoolSize(), properties.getMaximumPoolSize());
} else {
if (properties.getMaximumPoolSize() != null) {
executor.setMaximumPoolSize(properties.getMaximumPoolSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import cn.hippo4j.message.service.GlobalNotifyAlarmManage;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -172,13 +173,7 @@ private void threadPoolParamReplace(ThreadPoolExecutor executor, ExecutorPropert
BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(), executorProperties.getQueueCapacity());
ReflectUtil.setFieldValue(executor, "workQueue", workQueue);
// fix https://github.com/opengoofy/hippo4j/issues/1063
if (executorProperties.getCorePoolSize() > executor.getMaximumPoolSize()) {
executor.setMaximumPoolSize(executorProperties.getMaximumPoolSize());
executor.setCorePoolSize(executorProperties.getCorePoolSize());
} else {
executor.setCorePoolSize(executorProperties.getCorePoolSize());
executor.setMaximumPoolSize(executorProperties.getMaximumPoolSize());
}
ThreadPoolExecutorUtil.safeSetPoolSize(executor, executorProperties.getCorePoolSize(), executorProperties.getMaximumPoolSize());
executor.setKeepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut());
executor.setRejectedExecutionHandler(RejectedPolicyTypeEnum.createPolicy(executorProperties.getRejectedHandler()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
Expand Down Expand Up @@ -109,13 +110,7 @@ private void refreshDynamicPool(ThreadPoolParameter parameter, ThreadPoolExecuto

private void changePoolInfo(ThreadPoolExecutor executor, ThreadPoolParameter parameter) {
if (parameter.getCoreSize() != null && parameter.getMaxSize() != null) {
if (parameter.getMaxSize() < executor.getMaximumPoolSize()) {
executor.setCorePoolSize(parameter.getCoreSize());
executor.setMaximumPoolSize(parameter.getMaxSize());
} else {
executor.setMaximumPoolSize(parameter.getMaxSize());
executor.setCorePoolSize(parameter.getCoreSize());
}
ThreadPoolExecutorUtil.safeSetPoolSize(executor, parameter.getCoreSize(), parameter.getMaxSize());
} else {
if (parameter.getMaxSize() != null) {
executor.setMaximumPoolSize(parameter.getMaxSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import cn.hippo4j.message.service.GlobalNotifyAlarmManage;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hippo4j.springboot.starter.config.BootstrapProperties;
Expand Down Expand Up @@ -179,13 +180,7 @@ private void threadPoolParamReplace(ThreadPoolExecutor executor, ThreadPoolParam
BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(), threadPoolParameterInfo.getCapacity());
ReflectUtil.setFieldValue(executor, "workQueue", workQueue);
// fix https://github.com/opengoofy/hippo4j/issues/1063
if (threadPoolParameterInfo.getCorePoolSize() > executor.getMaximumPoolSize()) {
executor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt());
executor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt());
} else {
executor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt());
executor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt());
}
ThreadPoolExecutorUtil.safeSetPoolSize(executor, threadPoolParameterInfo.corePoolSizeAdapt(), threadPoolParameterInfo.maximumPoolSizeAdapt());
executor.setKeepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(EnableEnum.getBool(threadPoolParameterInfo.getAllowCoreThreadTimeOut()));
executor.setRejectedExecutionHandler(RejectedPolicyTypeEnum.createPolicy(threadPoolParameterInfo.getRejectedType()));
Expand Down

0 comments on commit 2546f04

Please sign in to comment.