Skip to content

Commit

Permalink
Merge 17b62f7 into ffaa4fb
Browse files Browse the repository at this point in the history
  • Loading branch information
yanrongzhen authored Apr 10, 2023
2 parents ffaa4fb + 17b62f7 commit d7d221f
Show file tree
Hide file tree
Showing 29 changed files with 373 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,10 @@
package cn.hippo4j.adapter.web.jetty;

import cn.hippo4j.adapter.web.DefaultAbstractWebThreadPoolService;
import cn.hippo4j.adapter.web.IWebThreadPoolHandlerSupport;
import cn.hippo4j.common.constant.ChangeThreadPoolConstants;
import cn.hippo4j.common.enums.WebContainerEnum;
import cn.hippo4j.common.model.ThreadPoolBaseInfo;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.common.toolkit.ReflectUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.springframework.boot.web.embedded.jetty.JettyWebServer;
import org.springframework.boot.web.server.WebServer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public ThreadPoolParameter getWebThreadPoolParameter() {
long keepAliveTime = tomcatThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS);
parameterInfo.setCoreSize(minThreads);
parameterInfo.setMaxSize(maxThreads);
parameterInfo.setKeepAliveTime((int) keepAliveTime);
parameterInfo.setKeepAliveTime(keepAliveTime);
} catch (Exception ex) {
log.error("Failed to get the tomcat thread pool parameter.", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@
import java.util.concurrent.Executor;

import cn.hippo4j.adapter.web.DefaultAbstractWebThreadPoolService;
import cn.hippo4j.adapter.web.IWebThreadPoolHandlerSupport;
import cn.hippo4j.common.enums.WebContainerEnum;
import cn.hippo4j.common.model.ThreadPoolBaseInfo;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import io.undertow.Undertow;
import lombok.extern.slf4j.Slf4j;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public ThreadPoolParameter getWebThreadPoolParameter() {
XnioWorker xnioWorker = (XnioWorker) executor;
int minThreads = xnioWorker.getOption(Options.WORKER_TASK_CORE_THREADS);
int maxThreads = xnioWorker.getOption(Options.WORKER_TASK_MAX_THREADS);
int keepAliveTime = xnioWorker.getOption(Options.WORKER_TASK_KEEPALIVE);
long keepAliveTime = xnioWorker.getOption(Options.WORKER_TASK_KEEPALIVE);
parameterInfo.setCoreSize(minThreads);
parameterInfo.setMaxSize(maxThreads);
parameterInfo.setKeepAliveTime(keepAliveTime);
Expand Down Expand Up @@ -135,7 +135,7 @@ public void updateWebThreadPool(ThreadPoolParameterInfo threadPoolParameterInfo)
XnioWorker xnioWorker = (XnioWorker) executor;
Integer coreSize = threadPoolParameterInfo.corePoolSizeAdapt();
Integer maxSize = threadPoolParameterInfo.maximumPoolSizeAdapt();
Integer keepAliveTime = threadPoolParameterInfo.getKeepAliveTime();
int keepAliveTime = threadPoolParameterInfo.getKeepAliveTime().intValue();
int originalCoreSize = xnioWorker.getOption(Options.WORKER_TASK_CORE_THREADS);
int originalMaximumPoolSize = xnioWorker.getOption(Options.WORKER_TASK_MAX_THREADS);
int originalKeepAliveTime = xnioWorker.getOption(Options.WORKER_TASK_KEEPALIVE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package cn.hippo4j.config.springboot.starter.config;
package cn.hippo4j.common.api;

import lombok.AllArgsConstructor;
import lombok.Data;
Expand All @@ -27,7 +27,7 @@
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DynamicThreadPoolNotifyProperties {
public class ExecutorNotifyProperties {

/**
* Thread pool run alarm interval. unit: s
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.hippo4j.common.api;

/**
* Interface for thread pool configuration.
*/
public interface IExecutorProperties {

/**
* Thread pool id
*/
String getThreadPoolId();

/**
* Core pool size
*/
Integer getCorePoolSize();

/**
* Maximum pool size
*/
Integer getMaximumPoolSize();

/**
* Keep alive time
*/
Long getKeepAliveTime();

/**
* Notify configs
*/
ExecutorNotifyProperties getNotify();
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public interface ThreadPoolParameter {
*
* @return
*/
Integer getKeepAliveTime();
Long getKeepAliveTime();

/**
* Get execute time out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public class ThreadPoolParameterInfo implements ThreadPoolParameter, Serializabl
/**
* Keep alive time
*/
private Integer keepAliveTime;
private Long keepAliveTime;

/**
* Execute time out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void assertGetPoolContent() {
":1,\"capacityAlarm\":80,\"livenessAlarm\":80,\"allowCoreThreadTimeOut\":1}";
ThreadPoolParameterInfo threadPoolParameterInfo = ThreadPoolParameterInfo.builder().tenantId("prescription")
.itemId("dynamic-threadpool-example").tpId("message-consume").content("描述信息").corePoolSize(1)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513).executeTimeOut(null).rejectedType(4)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513L).executeTimeOut(null).rejectedType(4)
.isAlarm(1).capacityAlarm(80).livenessAlarm(80).allowCoreThreadTimeOut(1).build();
Assert.isTrue(testText.equals(ContentUtil.getPoolContent(threadPoolParameterInfo)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void assetGetTpContentMd5() {
String md5Result = "ef5ea7cb47377fb9fb85a7125e76715d";
ThreadPoolParameterInfo threadPoolParameterInfo = ThreadPoolParameterInfo.builder().tenantId("prescription")
.itemId("dynamic-threadpool-example").tpId("message-consume").content("描述信息").corePoolSize(1)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513).executeTimeOut(null).rejectedType(4)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513L).executeTimeOut(null).rejectedType(4)
.isAlarm(1).capacityAlarm(80).livenessAlarm(80).allowCoreThreadTimeOut(1).build();
Assert.isTrue(md5Result.equals(Md5Util.getTpContentMd5(threadPoolParameterInfo)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void assertSingletonGet() {
Assert.assertEquals("hippo4j", Singleton.get("userName"));
ThreadPoolParameterInfo threadPoolParameterInfo = ThreadPoolParameterInfo.builder().tenantId("prescription")
.itemId("dynamic-threadpool-example").tpId("message-consume").content("描述信息").corePoolSize(1)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513).executeTimeOut(null).rejectedType(4)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513L).executeTimeOut(null).rejectedType(4)
.isAlarm(1).capacityAlarm(80).livenessAlarm(80).allowCoreThreadTimeOut(1).build();
Singleton.put(threadPoolParameterInfo);
Assert.assertEquals(threadPoolParameterInfo, Singleton.get(ThreadPoolParameterInfo.class.getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
*/
@Slf4j
@RequiredArgsConstructor
public class LarkSendMessageHandler implements SendMessageHandler<AlarmNotifyRequest, ChangeParameterNotifyRequest> {
public class LarkSendMessageHandler implements SendMessageHandler {

@Override
public String getType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
/**
* Abstract robot send message handler.
*/
public abstract class AbstractRobotSendMessageHandler implements SendMessageHandler<AlarmNotifyRequest, ChangeParameterNotifyRequest> {
public abstract class AbstractRobotSendMessageHandler implements SendMessageHandler {

/**
* Build message actual content.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.hippo4j.message.request;

import cn.hippo4j.message.request.base.BaseNotifyRequest;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Change parameter notify request for web thread pool.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WebChangeParameterNotifyRequest extends BaseNotifyRequest {

private String active;

private String appName;

private String identify;

private Integer beforeCorePoolSize;

private Integer nowCorePoolSize;

private Integer beforeMaximumPoolSize;

private Integer nowMaximumPoolSize;

private Long beforeKeepAliveTime;

private Long nowKeepAliveTime;

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package cn.hippo4j.message.request.base;

import cn.hippo4j.common.api.NotifyRequest;
import cn.hippo4j.message.enums.NotifyTypeEnum;
import lombok.Data;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import cn.hippo4j.message.enums.NotifyTypeEnum;
import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.request.WebChangeParameterNotifyRequest;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -53,11 +54,7 @@ public class Hippo4jBaseSendMessageService implements Hippo4jSendMessageService,
@Override
public void sendAlarmMessage(NotifyTypeEnum typeEnum, AlarmNotifyRequest alarmNotifyRequest) {
String threadPoolId = alarmNotifyRequest.getThreadPoolId();
String buildKey = new StringBuilder()
.append(threadPoolId)
.append("+")
.append("ALARM")
.toString();
String buildKey = generateAlarmKey(threadPoolId);
List<NotifyConfigDTO> notifyList = notifyConfigs.get(buildKey);
if (CollectionUtil.isEmpty(notifyList)) {
return;
Expand All @@ -82,11 +79,7 @@ public void sendAlarmMessage(NotifyTypeEnum typeEnum, AlarmNotifyRequest alarmNo
@Override
public void sendChangeMessage(ChangeParameterNotifyRequest changeParameterNotifyRequest) {
String threadPoolId = changeParameterNotifyRequest.getThreadPoolId();
String buildKey = new StringBuilder()
.append(threadPoolId)
.append("+")
.append("CONFIG")
.toString();
String buildKey = generateConfigKey(threadPoolId);
List<NotifyConfigDTO> notifyList = notifyConfigs.get(buildKey);
if (CollectionUtil.isEmpty(notifyList)) {
log.warn("[{}] Please configure alarm notification on the server.", threadPoolId);
Expand All @@ -106,6 +99,45 @@ public void sendChangeMessage(ChangeParameterNotifyRequest changeParameterNotify
});
}

@Override
public void sendChangeMessage(WebChangeParameterNotifyRequest webChangeParameterNotifyRequest) {
String threadPoolId = webChangeParameterNotifyRequest.getThreadPoolId();
String buildKey = generateConfigKey(threadPoolId);
List<NotifyConfigDTO> notifyList = notifyConfigs.get(buildKey);
if (CollectionUtil.isEmpty(notifyList)) {
log.warn("[{}] Please configure alarm notification on the server.", threadPoolId);
return;
}
notifyList.forEach(each -> {
try {
SendMessageHandler messageHandler = sendMessageHandlers.get(each.getPlatform());
if (messageHandler == null) {
log.warn("[{}] Please configure alarm notification on the server.", threadPoolId);
return;
}
messageHandler.sendWebChangeMessage(each, webChangeParameterNotifyRequest);
} catch (Exception ex) {
log.warn("Failed to send thread pool change notification. key: [{}]", threadPoolId, ex);
}
});
}

private String generateConfigKey(String threadPoolId) {
return new StringBuilder()
.append(threadPoolId)
.append("+")
.append("CONFIG")
.toString();
}

private String generateAlarmKey(String threadPoolId) {
return new StringBuilder()
.append(threadPoolId)
.append("+")
.append("ALARM")
.toString();
}

/**
* Is send alarm.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import cn.hippo4j.message.enums.NotifyTypeEnum;
import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.request.WebChangeParameterNotifyRequest;

/**
* Hippo-4j send message service.
Expand All @@ -40,4 +41,11 @@ public interface Hippo4jSendMessageService {
* @param changeParameterNotifyRequest change parameter notify request
*/
void sendChangeMessage(ChangeParameterNotifyRequest changeParameterNotifyRequest);

/**
* Send web thread pool parameter change notification.
*
* @param webChangeParameterNotifyRequest change parameter notify request
*/
void sendChangeMessage(WebChangeParameterNotifyRequest webChangeParameterNotifyRequest);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import cn.hippo4j.message.dto.NotifyConfigDTO;
import cn.hippo4j.common.api.NotifyRequest;
import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.request.WebChangeParameterNotifyRequest;

/**
* Send message handler.
*/
public interface SendMessageHandler<T extends NotifyRequest, R extends NotifyRequest> {
public interface SendMessageHandler {

/**
* Get the message send type.
Expand All @@ -38,13 +41,23 @@ public interface SendMessageHandler<T extends NotifyRequest, R extends NotifyReq
* @param notifyConfig notify config
* @param alarmNotifyRequest alarm notify request
*/
void sendAlarmMessage(NotifyConfigDTO notifyConfig, T alarmNotifyRequest);
void sendAlarmMessage(NotifyConfigDTO notifyConfig, AlarmNotifyRequest alarmNotifyRequest);

/**
* Send change message.
*
* @param notifyConfig notify config
* @param changeParameterNotifyRequest change parameter notify request
*/
void sendChangeMessage(NotifyConfigDTO notifyConfig, R changeParameterNotifyRequest);
void sendChangeMessage(NotifyConfigDTO notifyConfig, ChangeParameterNotifyRequest changeParameterNotifyRequest);

/**
* Send web change message.
*
* @param notifyConfig notify config
* @param changeParameterNotifyRequest parameter notify request
*/
default void sendWebChangeMessage(NotifyConfigDTO notifyConfig, WebChangeParameterNotifyRequest changeParameterNotifyRequest) throws IllegalAccessException {
throw new IllegalAccessException("Please implement this method before using it.");
}
}
Loading

0 comments on commit d7d221f

Please sign in to comment.