Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the ability to notify changes to the web container thread pool under the Config mode. #1133

Merged
merged 12 commits into from
Apr 11, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,10 @@
package cn.hippo4j.adapter.web.jetty;

import cn.hippo4j.adapter.web.DefaultAbstractWebThreadPoolService;
import cn.hippo4j.adapter.web.IWebThreadPoolHandlerSupport;
import cn.hippo4j.common.constant.ChangeThreadPoolConstants;
import cn.hippo4j.common.enums.WebContainerEnum;
import cn.hippo4j.common.model.ThreadPoolBaseInfo;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.common.toolkit.ReflectUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.springframework.boot.web.embedded.jetty.JettyWebServer;
import org.springframework.boot.web.server.WebServer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public ThreadPoolParameter getWebThreadPoolParameter() {
long keepAliveTime = tomcatThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS);
parameterInfo.setCoreSize(minThreads);
parameterInfo.setMaxSize(maxThreads);
parameterInfo.setKeepAliveTime((int) keepAliveTime);
parameterInfo.setKeepAliveTime(keepAliveTime);
} catch (Exception ex) {
log.error("Failed to get the tomcat thread pool parameter.", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@
import java.util.concurrent.Executor;

import cn.hippo4j.adapter.web.DefaultAbstractWebThreadPoolService;
import cn.hippo4j.adapter.web.IWebThreadPoolHandlerSupport;
import cn.hippo4j.common.enums.WebContainerEnum;
import cn.hippo4j.common.model.ThreadPoolBaseInfo;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import io.undertow.Undertow;
import lombok.extern.slf4j.Slf4j;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public ThreadPoolParameter getWebThreadPoolParameter() {
XnioWorker xnioWorker = (XnioWorker) executor;
int minThreads = xnioWorker.getOption(Options.WORKER_TASK_CORE_THREADS);
int maxThreads = xnioWorker.getOption(Options.WORKER_TASK_MAX_THREADS);
int keepAliveTime = xnioWorker.getOption(Options.WORKER_TASK_KEEPALIVE);
long keepAliveTime = xnioWorker.getOption(Options.WORKER_TASK_KEEPALIVE);
parameterInfo.setCoreSize(minThreads);
parameterInfo.setMaxSize(maxThreads);
parameterInfo.setKeepAliveTime(keepAliveTime);
Expand Down Expand Up @@ -135,7 +135,7 @@ public void updateWebThreadPool(ThreadPoolParameterInfo threadPoolParameterInfo)
XnioWorker xnioWorker = (XnioWorker) executor;
Integer coreSize = threadPoolParameterInfo.corePoolSizeAdapt();
Integer maxSize = threadPoolParameterInfo.maximumPoolSizeAdapt();
Integer keepAliveTime = threadPoolParameterInfo.getKeepAliveTime();
int keepAliveTime = threadPoolParameterInfo.getKeepAliveTime().intValue();
int originalCoreSize = xnioWorker.getOption(Options.WORKER_TASK_CORE_THREADS);
int originalMaximumPoolSize = xnioWorker.getOption(Options.WORKER_TASK_MAX_THREADS);
int originalKeepAliveTime = xnioWorker.getOption(Options.WORKER_TASK_KEEPALIVE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package cn.hippo4j.common.config;

import cn.hippo4j.common.enums.ProfileEnum;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
Expand Down Expand Up @@ -90,4 +91,12 @@ public static <A extends Annotation> A findAnnotationOnBean(String beanName, Cla
public static ApplicationContext getInstance() {
return CONTEXT;
}

/**
* Get current application profile.
*/
public static ProfileEnum getActiveProfile() {
return ProfileEnum.of(CONTEXT.getEnvironment().getActiveProfiles()[0]);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.hippo4j.common.enums;

import java.util.Arrays;

/**
* Application profile enum.
*/
public enum ProfileEnum {

UNKNOWN,
DEV,
TEST01,
PRE,
PROD;

public static ProfileEnum of(String profileStr) {
return Arrays.stream(values())
.filter(v -> v.name().equalsIgnoreCase(profileStr))
.findFirst()
.orElse(UNKNOWN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public interface ThreadPoolParameter {
*
* @return
*/
Integer getKeepAliveTime();
Long getKeepAliveTime();

/**
* Get execute time out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public class ThreadPoolParameterInfo implements ThreadPoolParameter, Serializabl
/**
* Keep alive time
*/
private Integer keepAliveTime;
private Long keepAliveTime;

/**
* Execute time out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void assertGetPoolContent() {
":1,\"capacityAlarm\":80,\"livenessAlarm\":80,\"allowCoreThreadTimeOut\":1}";
ThreadPoolParameterInfo threadPoolParameterInfo = ThreadPoolParameterInfo.builder().tenantId("prescription")
.itemId("dynamic-threadpool-example").tpId("message-consume").content("描述信息").corePoolSize(1)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513).executeTimeOut(null).rejectedType(4)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513L).executeTimeOut(null).rejectedType(4)
.isAlarm(1).capacityAlarm(80).livenessAlarm(80).allowCoreThreadTimeOut(1).build();
Assert.isTrue(testText.equals(ContentUtil.getPoolContent(threadPoolParameterInfo)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void assetGetTpContentMd5() {
String md5Result = "ef5ea7cb47377fb9fb85a7125e76715d";
ThreadPoolParameterInfo threadPoolParameterInfo = ThreadPoolParameterInfo.builder().tenantId("prescription")
.itemId("dynamic-threadpool-example").tpId("message-consume").content("描述信息").corePoolSize(1)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513).executeTimeOut(null).rejectedType(4)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513L).executeTimeOut(null).rejectedType(4)
.isAlarm(1).capacityAlarm(80).livenessAlarm(80).allowCoreThreadTimeOut(1).build();
Assert.isTrue(md5Result.equals(Md5Util.getTpContentMd5(threadPoolParameterInfo)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void assertSingletonGet() {
Assert.assertEquals("hippo4j", Singleton.get("userName"));
ThreadPoolParameterInfo threadPoolParameterInfo = ThreadPoolParameterInfo.builder().tenantId("prescription")
.itemId("dynamic-threadpool-example").tpId("message-consume").content("描述信息").corePoolSize(1)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513).executeTimeOut(null).rejectedType(4)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513L).executeTimeOut(null).rejectedType(4)
.isAlarm(1).capacityAlarm(80).livenessAlarm(80).allowCoreThreadTimeOut(1).build();
Singleton.put(threadPoolParameterInfo);
Assert.assertEquals(threadPoolParameterInfo, Singleton.get(ThreadPoolParameterInfo.class.getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
*/
@Slf4j
@RequiredArgsConstructor
public class LarkSendMessageHandler implements SendMessageHandler<AlarmNotifyRequest, ChangeParameterNotifyRequest> {
public class LarkSendMessageHandler implements SendMessageHandler {

@Override
public String getType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
/**
* Abstract robot send message handler.
*/
public abstract class AbstractRobotSendMessageHandler implements SendMessageHandler<AlarmNotifyRequest, ChangeParameterNotifyRequest> {
public abstract class AbstractRobotSendMessageHandler implements SendMessageHandler {

/**
* Build message actual content.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.hippo4j.message.request;

import cn.hippo4j.message.request.base.BaseNotifyRequest;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Change parameter notify request for web thread pool.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WebChangeParameterNotifyRequest extends BaseNotifyRequest {

private String active;

private String appName;

private String identify;

private Integer beforeCorePoolSize;

private Integer nowCorePoolSize;

private Integer beforeMaximumPoolSize;

private Integer nowMaximumPoolSize;

private Long beforeKeepAliveTime;

private Long nowKeepAliveTime;

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package cn.hippo4j.message.request.base;

import cn.hippo4j.common.api.NotifyRequest;
import cn.hippo4j.message.enums.NotifyTypeEnum;
import lombok.Data;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import cn.hippo4j.message.enums.NotifyTypeEnum;
import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.request.WebChangeParameterNotifyRequest;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -53,11 +54,7 @@ public class Hippo4jBaseSendMessageService implements Hippo4jSendMessageService,
@Override
public void sendAlarmMessage(NotifyTypeEnum typeEnum, AlarmNotifyRequest alarmNotifyRequest) {
String threadPoolId = alarmNotifyRequest.getThreadPoolId();
String buildKey = new StringBuilder()
.append(threadPoolId)
.append("+")
.append("ALARM")
.toString();
String buildKey = generateAlarmKey(threadPoolId);
List<NotifyConfigDTO> notifyList = notifyConfigs.get(buildKey);
if (CollectionUtil.isEmpty(notifyList)) {
return;
Expand All @@ -82,11 +79,7 @@ public void sendAlarmMessage(NotifyTypeEnum typeEnum, AlarmNotifyRequest alarmNo
@Override
public void sendChangeMessage(ChangeParameterNotifyRequest changeParameterNotifyRequest) {
String threadPoolId = changeParameterNotifyRequest.getThreadPoolId();
String buildKey = new StringBuilder()
.append(threadPoolId)
.append("+")
.append("CONFIG")
.toString();
String buildKey = generateConfigKey(threadPoolId);
List<NotifyConfigDTO> notifyList = notifyConfigs.get(buildKey);
if (CollectionUtil.isEmpty(notifyList)) {
log.warn("[{}] Please configure alarm notification on the server.", threadPoolId);
Expand All @@ -106,6 +99,45 @@ public void sendChangeMessage(ChangeParameterNotifyRequest changeParameterNotify
});
}

@Override
public void sendChangeMessage(WebChangeParameterNotifyRequest webChangeParameterNotifyRequest) {
String threadPoolId = webChangeParameterNotifyRequest.getThreadPoolId();
String buildKey = generateConfigKey(threadPoolId);
List<NotifyConfigDTO> notifyList = notifyConfigs.get(buildKey);
if (CollectionUtil.isEmpty(notifyList)) {
log.warn("[{}] Please configure alarm notification on the server.", threadPoolId);
return;
}
notifyList.forEach(each -> {
try {
SendMessageHandler messageHandler = sendMessageHandlers.get(each.getPlatform());
if (messageHandler == null) {
log.warn("[{}] Please configure alarm notification on the server.", threadPoolId);
return;
}
messageHandler.sendWebChangeMessage(each, webChangeParameterNotifyRequest);
} catch (Exception ex) {
log.warn("Failed to send thread pool change notification. key: [{}]", threadPoolId, ex);
}
});
}

private String generateConfigKey(String threadPoolId) {
return new StringBuilder()
.append(threadPoolId)
.append("+")
.append("CONFIG")
.toString();
}

private String generateAlarmKey(String threadPoolId) {
return new StringBuilder()
.append(threadPoolId)
.append("+")
.append("ALARM")
.toString();
}

/**
* Is send alarm.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import cn.hippo4j.message.enums.NotifyTypeEnum;
import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.request.WebChangeParameterNotifyRequest;

/**
* Hippo-4j send message service.
Expand All @@ -40,4 +41,11 @@ public interface Hippo4jSendMessageService {
* @param changeParameterNotifyRequest change parameter notify request
*/
void sendChangeMessage(ChangeParameterNotifyRequest changeParameterNotifyRequest);

/**
* Send web thread pool parameter change notification.
*
* @param webChangeParameterNotifyRequest change parameter notify request
*/
void sendChangeMessage(WebChangeParameterNotifyRequest webChangeParameterNotifyRequest);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import cn.hippo4j.message.dto.NotifyConfigDTO;
import cn.hippo4j.common.api.NotifyRequest;
import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.request.WebChangeParameterNotifyRequest;

/**
* Send message handler.
*/
public interface SendMessageHandler<T extends NotifyRequest, R extends NotifyRequest> {
public interface SendMessageHandler {

/**
* Get the message send type.
Expand All @@ -38,13 +41,23 @@ public interface SendMessageHandler<T extends NotifyRequest, R extends NotifyReq
* @param notifyConfig notify config
* @param alarmNotifyRequest alarm notify request
*/
void sendAlarmMessage(NotifyConfigDTO notifyConfig, T alarmNotifyRequest);
void sendAlarmMessage(NotifyConfigDTO notifyConfig, AlarmNotifyRequest alarmNotifyRequest);

/**
* Send change message.
*
* @param notifyConfig notify config
* @param changeParameterNotifyRequest change parameter notify request
*/
void sendChangeMessage(NotifyConfigDTO notifyConfig, R changeParameterNotifyRequest);
void sendChangeMessage(NotifyConfigDTO notifyConfig, ChangeParameterNotifyRequest changeParameterNotifyRequest);

/**
* Send web change message.
*
* @param notifyConfig notify config
* @param changeParameterNotifyRequest parameter notify request
*/
default void sendWebChangeMessage(NotifyConfigDTO notifyConfig, WebChangeParameterNotifyRequest changeParameterNotifyRequest) throws IllegalAccessException {
throw new IllegalAccessException("Please implement this method before using it.");
}
}
Loading