Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add thread pool automatic registration function #494

Merged
merged 2 commits into from
Aug 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class Constants {

public static final String REGISTER_ADAPTER_PATH = REGISTER_ADAPTER_BASE_PATH + "/register";

public static final String REGISTER_DYNAMIC_THREAD_POOL_PATH = CONFIG_CONTROLLER_PATH + "/register";

public static final String HEALTH_CHECK_PATH = BASE_PATH + "/health/check";

public static final String PROBE_MODIFY_REQUEST = "Listening-Configs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,33 @@ public class ThreadPoolParameterInfo implements ThreadPoolParameter, Serializabl
private static final long serialVersionUID = -7123935122108553864L;

/**
* tenantId
* Tenant id
*/
private String tenantId;

/**
* itemId
* Item id
*/
private String itemId;

/**
* tpId
* Thread-pool id
*/
private String tpId;

/**
* content
* Content
*/
private String content;

/**
* coreSize
* Core size
*/
@Deprecated
private Integer coreSize;

/**
* maxSize
* Max size
*/
@Deprecated
private Integer maxSize;
Expand All @@ -80,42 +80,42 @@ public class ThreadPoolParameterInfo implements ThreadPoolParameter, Serializabl
private Integer maximumPoolSize;

/**
* queueType
* Queue type
*/
private Integer queueType;

/**
* capacity
* Capacity
*/
private Integer capacity;

/**
* keepAliveTime
* Keep alive time
*/
private Integer keepAliveTime;

/**
* rejectedType
* Rejected type
*/
private Integer rejectedType;

/**
* isAlarm
* Is alarm
*/
private Integer isAlarm;

/**
* capacityAlarm
* Capacity alarm
*/
private Integer capacityAlarm;

/**
* livenessAlarm
* Liveness alarm
*/
private Integer livenessAlarm;

/**
* allowCoreThreadTimeOut
* Allow core thread timeout
*/
private Integer allowCoreThreadTimeOut;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package cn.hippo4j.common.model.register;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Dynamic thread-pool register parameter.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DynamicThreadPoolRegisterParameter {

/**
* Thread-pool id
* Empty or empty strings are not allowed, and `+` signs are not allowed
*/
private String threadPoolId;

/**
* Content
*/
private String content;

/**
* Core pool size
*/
private Integer corePoolSize;

/**
* Maximum pool size
*/
private Integer maximumPoolSize;

/**
* Queue type
*/
private Integer queueType;

/**
* Capacity
*/
private Integer capacity;

/**
* Keep alive time
*/
private Integer keepAliveTime;

/**
* Rejected type
*/
private Integer rejectedType;

/**
* Is alarm
*/
private Integer isAlarm;

/**
* Capacity alarm
*/
private Integer capacityAlarm;

/**
* Liveness alarm
*/
private Integer livenessAlarm;

/**
* Allow core thread timeout
*/
private Integer allowCoreThreadTimeOut;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package cn.hippo4j.common.model.register;

import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.concurrent.ThreadPoolExecutor;

/**
* Dynamic thread-pool register wrapper.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DynamicThreadPoolRegisterWrapper {

/**
* Tenant id
*/
private String tenantId;

/**
* Item id
*/
private String itemId;

/**
* Update if exists
*/
private Boolean updateIfExists = Boolean.TRUE;

/**
* Dynamic thread-pool executor
*/
@JsonIgnore
private ThreadPoolExecutor dynamicThreadPoolExecutor;

/**
* Dynamic thread-pool register parameter
*/
private DynamicThreadPoolRegisterParameter dynamicThreadPoolRegisterParameter;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package cn.hippo4j.config.controller;

import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.config.model.ConfigAllInfo;
import cn.hippo4j.config.model.ConfigInfoBase;
import cn.hippo4j.config.service.ConfigCacheService;
import cn.hippo4j.config.service.ConfigServletInner;
import cn.hippo4j.config.toolkit.Md5ConfigUtil;
import cn.hippo4j.config.service.biz.ConfigService;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.config.toolkit.Md5ConfigUtil;
import cn.hutool.core.util.StrUtil;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
Expand Down Expand Up @@ -91,4 +92,10 @@ public Result removeConfigCache(@RequestBody Map<String, String> bodyMap) {
}
return Results.success();
}

@PostMapping("/register")
public Result register(@RequestBody DynamicThreadPoolRegisterWrapper registerWrapper) {
configService.register(registerWrapper);
return Results.success();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.fasterxml.jackson.annotation.JsonAlias;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;

Expand Down Expand Up @@ -56,11 +57,13 @@ public class ConfigInfoBase implements Serializable {
/**
* coreSize
*/
@JsonAlias("corePoolSize")
private Integer coreSize;

/**
* maxSize
*/
@JsonAlias("maximumPoolSize")
private Integer maxSize;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void addSubscriber(AbstractSubscriber subscriber) {
public boolean publish(AbstractEvent event) {
boolean success = this.queue.offer(event);
if (!success) {
log.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
log.warn("Unable to plug in due to interruption, synchronize sending time, event: {}", event);
receiveEvent(event);
return true;
}
Expand All @@ -130,7 +130,7 @@ public void notifySubscriber(AbstractSubscriber subscriber, AbstractEvent event)
try {
job.run();
} catch (Throwable e) {
log.error("Event callback exception : {}", e);
log.error("Event callback exception: {}", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class NotifyCenter {
publisher.init(cls, buffer);
return publisher;
} catch (Throwable ex) {
log.error("Service class newInstance has error : {}", ex);
log.error("Service class newInstance has error: {}", ex);
throw new RuntimeException(ex);
}
};
Expand Down Expand Up @@ -98,7 +98,7 @@ public static boolean publishEvent(final AbstractEvent event) {
try {
return publishEvent(event.getClass(), event);
} catch (Throwable ex) {
log.error("There was an exception to the message publishing : {}", ex);
log.error("There was an exception to the message publishing: {}", ex);
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private synchronized static void coarseRemove(String coarse) {
List<String> identificationList = MapUtil.parseMapForFilter(CLIENT_CONFIG_CACHE, coarse);
for (String cacheMapKey : identificationList) {
Map<String, CacheItem> removeCacheItem = CLIENT_CONFIG_CACHE.remove(cacheMapKey);
log.info("Remove invalidated config cache. config info : {}", JSONUtil.toJSONString(removeCacheItem));
log.info("Remove invalidated config cache. config info: {}", JSONUtil.toJSONString(removeCacheItem));
}
}

Expand All @@ -189,7 +189,7 @@ static class ClearConfigCache implements Observer<String> {

@Override
public void accept(ObserverMessage<String> observerMessage) {
log.info("Clean up the configuration cache. Key : {}", observerMessage.message());
log.info("Clean up the configuration cache. Key: {}", observerMessage.message());
coarseRemove(observerMessage.message());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package cn.hippo4j.config.service.biz;

import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.config.model.ConfigAllInfo;

/**
Expand Down Expand Up @@ -50,4 +51,11 @@ public interface ConfigService {
* @param configAllInfo
*/
void insertOrUpdate(String identify, boolean isChangeNotice, ConfigAllInfo configAllInfo);

/**
* Register.
*
* @param registerWrapper
*/
void register(DynamicThreadPoolRegisterWrapper registerWrapper);
}
Loading