Skip to content

Commit

Permalink
Merge pull request #104 from qqxx6661/dev_1.6.x
Browse files Browse the repository at this point in the history
custom thread pool support
  • Loading branch information
qqxx6661 authored May 26, 2024
2 parents c71d944 + deb9dd7 commit a38bae4
Show file tree
Hide file tree
Showing 23 changed files with 378 additions and 35 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ jobs:
path: ~/.m2
key: ${{ env.cache-name }}-${{ hashFiles('./log-record-starter/pom.xml') }}
restore-keys: ${{ env.cache-name }}-
- name: Install current version to local repository
run: mvn install -DskipTests -Dgpg.skip
working-directory: ./log-record-core
- name: Test with Maven
working-directory: ./log-record-starter
run: mvn -V --no-transfer-progress test
Expand Down
54 changes: 43 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ public Response<T> function(Request request) {
- 支持自动重试和兜底处理:支持配置重试次数和处理失败兜底逻辑`SPI`
- 支持控制切面执行时机(方法执行前后)
- 支持自定义执行成功判断
- 支持非注解方式手动记录日志
- 自定义消息线程池
- 更多特性等你来发掘...

**日志实体(LogDTO)内包含:**
Expand Down Expand Up @@ -347,8 +349,9 @@ public Response<T> function(Request request) {
- [实体类`Diff`](#实体类Diff)
- [日志处理重试次数及兜底函数配置](#日志处理重试次数及兜底函数配置)
- [重复注解](#重复注解)
- [消息分发线程池配置](#消息分发线程池配置)
- [自定义消息线程池](#自定义消息线程池)
- [函数返回值记录开关](#函数返回值记录开关)
- [非注解方式手动记录日志](#非注解方式)
- [操作日志数据表结构推荐](#操作日志数据表结构推荐)
- [让注解支持`IDEA`自动补全](#让注解支持IDEA自动补全)

Expand Down Expand Up @@ -512,7 +515,7 @@ public Response<T> function(Request request) {
}
```

LogRecordContext内部使用TransmittableThreadLocal,在线程池中也可以读取到主线程的ThreadLocal
LogRecordContext内部使用TransmittableThreadLocal实现与主线程的ThreadLocal传递

### 自定义函数

Expand Down Expand Up @@ -795,20 +798,49 @@ public class LogRecordErrorHandlerServiceImpl implements LogRecordErrorHandlerSe

我们还加上了重复注解的支持,可以在一个方法上同时加多个`@OperationLog`**会保证按照`@OperationLog`从上到下的顺序输出日志**

### 消息分发线程池配置
### 自定义消息线程池

在组装好`logDTO`后,默认使用线程池对消息进行分发,发送至本地监听函数或者消息队列发送者。

**注意:`logDTO`的组装在切面中,该切面仍然在函数执行的线程中运行。**

可以使用如下配置:
starter提供了如下配置:

```properties
log-record.thread-pool.pool-size=4(线程池核心线程大小 默认为4)
log-record.thread-pool.enabled=true(线程池开关 默认为开启 若关闭则使用主线程进行消息处理发送)
log-record.thread-pool.enabled=true(线程池开关 默认为开启 若关闭则使用业务线程进行消息处理发送)
```

在组装好`logDTO`后,默认会使用线程池对消息进行处理,发送至本地监听函数或者消息队列发送者,也可以通过配置关闭线程池,让主线程执行全部消息处理逻辑。

**注意:`logDTO`的组装逻辑在切面中,该切面仍然在函数执行的线程中运行。**

默认线程池配置如下(拒绝策略为丢弃):

```java
return new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), THREAD_FACTORY, new ThreadPoolExecutor.AbortPolicy());
```

此外,还提供了用户传入自定义线程池的方式,用户可自行实现cn.monitor4all.logRecord.thread.ThreadPoolProvider,传入线程池。

示例:

```java
public class CustomThreadPoolProvider implements ThreadPoolProvider {

private static ThreadPoolExecutor EXECUTOR;

private static final ThreadFactory THREAD_FACTORY = new CustomizableThreadFactory("custom-log-record-");


private CustomThreadPoolProvider() {
log.info("CustomThreadPoolProvider init");
EXECUTOR = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), THREAD_FACTORY, new ThreadPoolExecutor.AbortPolicy());
}

@Override
public ThreadPoolExecutor buildLogRecordThreadPool() {
return EXECUTOR;
}
}
```

关闭使用线程池后,所有发送由主线程执行,带来的副作用是大量日志并发发送,会降低主线程处理效率。

### 函数返回值记录开关

Expand Down Expand Up @@ -893,7 +925,7 @@ public void testBizIdWithSpEL(String bizId) {

应用之间通过关键操作的日志消息,互相通知。

## 附录:Demo
## Demo

当你觉得用法不熟悉,可以查看单元测试用例,里面有最为详细且最全的使用示例。

Expand Down
2 changes: 1 addition & 1 deletion log-record-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>cn.monitor4all</groupId>
<artifactId>log-record-core</artifactId>
<version>1.6.2</version>
<version>1.6.3</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
import cn.monitor4all.logRecord.service.IOperatorIdGetService;
import cn.monitor4all.logRecord.service.LogRecordErrorHandlerService;
import cn.monitor4all.logRecord.thread.LogRecordThreadPool;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import cn.monitor4all.logRecord.util.JsonUtil;
import com.alibaba.ttl.TtlRunnable;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -128,7 +127,7 @@ public Object doAround(ProceedingJoinPoint pjp) throws Throwable {
logDTO.setSuccess(true);
}
if (annotation.recordReturnValue() && result != null) {
logDTO.setReturnStr(JSON.toJSONString(result));
logDTO.setReturnStr(JsonUtil.safeToJsonString(result));
}
});
} catch (Throwable throwableAfterFuncSuccess) {
Expand Down Expand Up @@ -305,7 +304,7 @@ private String parseParamToStringOrJson(String spel, StandardEvaluationContext c
Expression msgExpression = parser.parseExpression(spel);
Object obj = msgExpression.getValue(context, Object.class);
if (obj != null) {
return obj instanceof String ? (String) obj : JSON.toJSONString(obj, SerializerFeature.WriteMapNullValue);
return obj instanceof String ? (String) obj : JsonUtil.safeToJsonString(obj);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,11 @@ private static boolean isJsonArray(Object obj) {
private static Field[] getAllFields(Class<?> type) {
List<Field> fields = new ArrayList<>();
for (Class<?> c = type; c != null && !c.isSynthetic(); c = c.getSuperclass()) {
Collections.addAll(fields, c.getDeclaredFields());
for (Field field : c.getDeclaredFields()) {
if (!field.isSynthetic()) {
fields.add(field);
}
}
}
return fields.toArray(new Field[0]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import cn.monitor4all.logRecord.configuration.LogRecordProperties;
import cn.monitor4all.logRecord.constants.LogConstants;
import cn.monitor4all.logRecord.service.DataPipelineService;
import com.alibaba.fastjson.JSON;
import cn.monitor4all.logRecord.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -27,7 +27,7 @@ public class RabbitMqDataPipelineServiceImpl implements DataPipelineService {
@Override
public boolean createLog(LogDTO logDTO) {
log.info("LogRecord RabbitMq ready to send routingKey [{}] LogDTO [{}]", properties.getRabbitMqProperties().getRoutingKey(), logDTO);
rubeExchangeTemplate.convertAndSend(properties.getRabbitMqProperties().getRoutingKey(), JSON.toJSONString(logDTO));
rubeExchangeTemplate.convertAndSend(properties.getRabbitMqProperties().getRoutingKey(), JsonUtil.safeToJsonString(logDTO));
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import cn.monitor4all.logRecord.configuration.LogRecordProperties;
import cn.monitor4all.logRecord.constants.LogConstants;
import cn.monitor4all.logRecord.service.DataPipelineService;
import com.alibaba.fastjson.JSON;
import cn.monitor4all.logRecord.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
Expand All @@ -30,7 +30,7 @@ public class RocketMqDataPipelineServiceImpl implements DataPipelineService {
@Override
public boolean createLog(LogDTO logDTO) {
try {
Message msg = new Message(properties.getRocketMqProperties().getTopic(), properties.getRocketMqProperties().getTag(), (JSON.toJSONString(logDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));
Message msg = new Message(properties.getRocketMqProperties().getTopic(), properties.getRocketMqProperties().getTag(), (JsonUtil.safeToJsonString(logDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = defaultMqProducer.send(msg);
log.info("LogRecord RocketMq send LogDTO [{}] sendResult: [{}]", logDTO, sendResult);
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package cn.monitor4all.logRecord.thread;

import cn.monitor4all.logRecord.configuration.LogRecordProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 默认线程池提供者
*/
@Slf4j
public class DefaultThreadPoolProvider implements ThreadPoolProvider {

private final LogRecordProperties logRecordProperties;
private static final ThreadFactory THREAD_FACTORY = new CustomizableThreadFactory("log-record-");


public DefaultThreadPoolProvider(LogRecordProperties logRecordProperties) {
this.logRecordProperties = logRecordProperties;
}

@Override
public ThreadPoolExecutor buildLogRecordThreadPool() {
log.info("LogRecordThreadPool init poolSize [{}]", logRecordProperties.getThreadPool().getPoolSize());
int poolSize = logRecordProperties.getThreadPool().getPoolSize();
return new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), THREAD_FACTORY, new ThreadPoolExecutor.AbortPolicy());
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,31 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import java.util.concurrent.*;
import java.util.concurrent.ThreadPoolExecutor;

@Slf4j
@Component
@ConditionalOnProperty(name = "log-record.thread-pool.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties({LogRecordProperties.class})
public class LogRecordThreadPool {

private static final ThreadFactory THREAD_FACTORY = new CustomizableThreadFactory("log-record-");
private final ThreadPoolExecutor logRecordPoolExecutor;

private final ExecutorService LOG_RECORD_POOL_EXECUTOR;

public LogRecordThreadPool(LogRecordProperties logRecordProperties) {
log.info("LogRecordThreadPool init poolSize [{}]", logRecordProperties.getThreadPool().getPoolSize());
int poolSize = logRecordProperties.getThreadPool().getPoolSize();
this.LOG_RECORD_POOL_EXECUTOR = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
/**
* 操作日志主逻辑线程池
* 提供顺序:用户传入线程池 优先于 通过配置文件创建的默认线程池
*/
public LogRecordThreadPool(LogRecordProperties logRecordProperties, ApplicationContext applicationContext) {
ThreadPoolProvider threadPoolProvider = applicationContext.getBeanProvider(ThreadPoolProvider.class)
.getIfUnique(() -> new DefaultThreadPoolProvider(logRecordProperties));
this.logRecordPoolExecutor = threadPoolProvider.buildLogRecordThreadPool();
}

public ExecutorService getLogRecordPoolExecutor() {
return LOG_RECORD_POOL_EXECUTOR;
public ThreadPoolExecutor getLogRecordPoolExecutor() {
return logRecordPoolExecutor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package cn.monitor4all.logRecord.thread;


import java.util.concurrent.ThreadPoolExecutor;

/**
* 线程池提供者
*/
public interface ThreadPoolProvider {

/**
* 提供操作日志处理线程池
*/
ThreadPoolExecutor buildLogRecordThreadPool();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package cn.monitor4all.logRecord.util;


import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class JsonUtil {

public static String safeToJsonString(Object object) {
try {
return JSON.toJSONString(object);
} catch (Exception e) {
log.error("safeToJsonString error, object {}", object, e);
return object.toString();
}
}
}
4 changes: 2 additions & 2 deletions log-record-springboot3-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>cn.monitor4all</groupId>
<artifactId>log-record-springboot3-starter</artifactId>
<version>1.6.2</version>
<version>1.6.3</version>

<properties>
<maven.compiler.source>17</maven.compiler.source>
Expand All @@ -35,7 +35,7 @@
<dependency>
<groupId>cn.monitor4all</groupId>
<artifactId>log-record-core</artifactId>
<version>1.6.2</version>
<version>1.6.3</version>
</dependency>

<!-- 单元测试依赖 -->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package cn.monitor4all.logRecord.springboot3.test;


import cn.monitor4all.logRecord.bean.LogDTO;
import cn.monitor4all.logRecord.springboot3.test.service.OperatorIdGetService;
import cn.monitor4all.logRecord.springboot3.test.service.TestService;
import cn.monitor4all.logRecord.springboot3.LogRecordAutoConfiguration;
import cn.monitor4all.logRecord.springboot3.test.utils.TestHelper;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.context.annotation.PropertySource;
import org.springframework.test.context.ContextConfiguration;

/**
* 单元测试:自定义线程池
*/
@Slf4j
@SpringBootTest
@ContextConfiguration(classes = {
LogRecordAutoConfiguration.class,
OperatorIdGetService.class,
TestService.class,})
@PropertySource("classpath:testCustomThreadPool.properties")
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class OperationLogCustomThreadPoolTest {

@Autowired
private TestService testService;

/**
* 测试:用户传入自定义线程池
*/
@Test
public void testCustomThreadPool() {
TestHelper.addLock("testCustomThreadPool");
testService.testCustomThreadPool();
TestHelper.await("testCustomThreadPool");
LogDTO logDTO = TestHelper.getLogDTO("testCustomThreadPool");

Assertions.assertEquals(logDTO.getBizType(), "testCustomThreadPool");
}

}
Loading

0 comments on commit a38bae4

Please sign in to comment.