Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 多个实例导致多个守护线程的问题,修改成单例+定时任务线程池处理 #2663

Merged
merged 1 commit into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
* <pre>
* 默认消息重复检查器.
* 将每个消息id保存在内存里,每隔5秒清理已经过期的消息id,每个消息id的过期时间是15秒
* 替换类WxMessageInMemoryDuplicateCheckerSingleton
* </pre>
*
* @author Daniel Qian
*/
@Deprecated
public class WxMessageInMemoryDuplicateChecker implements WxMessageDuplicateChecker {

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package me.chanjar.weixin.common.api;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* @author jiangby
* @version 1.0
* <p>
* 消息去重,记录消息ID首次出现时的时间戳,
* 15S后定时任务触发时废除该记录消息ID
* </p>
* @date 2022/5/26 1:32
*/
@Slf4j
public class WxMessageInMemoryDuplicateCheckerSingleton implements WxMessageDuplicateChecker {

/**
* 一个消息ID在内存的过期时间:15秒.
*/
private static final Long TIME_TO_LIVE = 15L;

/**
* 每隔多少周期检查消息ID是否过期:5秒.
*/
private static final Long CLEAR_PERIOD = 5L;

/**
* 线程池
*/
private static final ScheduledThreadPoolExecutor SCHEDULED_THREAD_POOL_EXECUTOR = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder().setNameFormat("wxMessage-memory-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());

/**
* 消息id->消息时间戳的map.
*/
private static final ConcurrentHashMap<String, Long> MSG_ID_2_TIMESTAMP = new ConcurrentHashMap<>();

static {
SCHEDULED_THREAD_POOL_EXECUTOR.scheduleAtFixedRate(() -> {
try {
Long now = System.currentTimeMillis();
MSG_ID_2_TIMESTAMP.entrySet().removeIf(entry -> now - entry.getValue() > TIME_TO_LIVE * 1000);
} catch (Exception ex) {
log.error("重复消息去重任务出现异常", ex);
}
}, 1, CLEAR_PERIOD, TimeUnit.SECONDS);
}

/**
* 私有化构造方法,避免外部调用
*/
private WxMessageInMemoryDuplicateCheckerSingleton() {
}

/**
* 获取单例
*
* @return 单例对象
*/
public static WxMessageInMemoryDuplicateCheckerSingleton getInstance() {
return WxMessageInnerClass.CHECKER_SINGLETON;
}

/**
* 内部类实现单例
*/
private static class WxMessageInnerClass {
static final WxMessageInMemoryDuplicateCheckerSingleton CHECKER_SINGLETON = new WxMessageInMemoryDuplicateCheckerSingleton();
}

/**
* messageId是否重复
*
* @param messageId messageId
* @return 是否
*/
@Override
public boolean isDuplicate(String messageId) {
if (messageId == null) {
return false;
}
Long timestamp = MSG_ID_2_TIMESTAMP.putIfAbsent(messageId, System.currentTimeMillis());
return timestamp != null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package me.chanjar.weixin.common.api;

import org.testng.annotations.Test;

import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

/**
* @author jiangby
* @version 1.0
* @description: 作用
* @date 2022/5/26 1:46
*/
@Test
public class WxMessageInMemoryDuplicateCheckerSingletonTest {

private static WxMessageInMemoryDuplicateCheckerSingleton checkerSingleton = WxMessageInMemoryDuplicateCheckerSingleton.getInstance();

public void test() throws InterruptedException {
Long[] msgIds = new Long[]{1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L};

// 第一次检查
for (Long msgId : msgIds) {
boolean result = checkerSingleton.isDuplicate(String.valueOf(msgId));
assertFalse(result);
}

// 初始化后1S进行检查 每五秒检查一次,过期时间为15秒,过15秒再检查
TimeUnit.SECONDS.sleep(15);
for (Long msgId : msgIds) {
boolean result = checkerSingleton.isDuplicate(String.valueOf(msgId));
assertTrue(result);
}

// 过6秒再检查
TimeUnit.SECONDS.sleep(6);
for (Long msgId : msgIds) {
boolean result = checkerSingleton.isDuplicate(String.valueOf(msgId));
assertFalse(result);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import me.chanjar.weixin.common.api.WxErrorExceptionHandler;
import me.chanjar.weixin.common.api.WxMessageDuplicateChecker;
import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker;
import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateCheckerSingleton;
import me.chanjar.weixin.common.session.InternalSession;
import me.chanjar.weixin.common.session.InternalSessionManager;
import me.chanjar.weixin.common.session.WxSessionManager;
Expand Down Expand Up @@ -71,7 +72,7 @@ public WxCpMessageRouter(WxCpService wxCpService) {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxCpMessageRouter-pool-%d").build();
this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory);
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance();
this.sessionManager = wxCpService.getSessionManager();
this.exceptionHandler = new LogExceptionHandler();
}
Expand All @@ -82,7 +83,7 @@ public WxCpMessageRouter(WxCpService wxCpService) {
public WxCpMessageRouter(WxCpService wxMpService, ExecutorService executorService) {
this.wxCpService = wxMpService;
this.executorService = executorService;
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance();
this.sessionManager = wxCpService.getSessionManager();
this.exceptionHandler = new LogExceptionHandler();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import me.chanjar.weixin.common.api.WxErrorExceptionHandler;
import me.chanjar.weixin.common.api.WxMessageDuplicateChecker;
import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker;
import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateCheckerSingleton;
import me.chanjar.weixin.common.session.InternalSession;
import me.chanjar.weixin.common.session.InternalSessionManager;
import me.chanjar.weixin.common.session.WxSessionManager;
Expand Down Expand Up @@ -73,7 +74,7 @@ public WxCpTpMessageRouter(WxCpTpService wxCpTpService) {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxCpTpMessageRouter-pool-%d").build();
this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory);
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance();
this.sessionManager = wxCpTpService.getSessionManager();
this.exceptionHandler = new LogExceptionHandler();
}
Expand All @@ -84,7 +85,7 @@ public WxCpTpMessageRouter(WxCpTpService wxCpTpService) {
public WxCpTpMessageRouter(WxCpTpService wxCpTpService, ExecutorService executorService) {
this.wxCpTpService = wxCpTpService;
this.executorService = executorService;
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance();
this.sessionManager = wxCpTpService.getSessionManager();
this.exceptionHandler = new LogExceptionHandler();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import me.chanjar.weixin.common.api.WxErrorExceptionHandler;
import me.chanjar.weixin.common.api.WxMessageDuplicateChecker;
import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker;
import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateCheckerSingleton;
import me.chanjar.weixin.common.session.InternalSession;
import me.chanjar.weixin.common.session.InternalSessionManager;
import me.chanjar.weixin.common.session.StandardSessionManager;
Expand Down Expand Up @@ -48,7 +49,7 @@ public WxMaMessageRouter(WxMaService wxMaService) {
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory);
this.sessionManager = new StandardSessionManager();
this.exceptionHandler = new LogExceptionHandler();
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance();
}

/**
Expand All @@ -59,7 +60,7 @@ public WxMaMessageRouter(WxMaService wxMaService, ExecutorService executorServic
this.executorService = executorService;
this.sessionManager = new StandardSessionManager();
this.exceptionHandler = new LogExceptionHandler();
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import me.chanjar.weixin.common.api.WxErrorExceptionHandler;
import me.chanjar.weixin.common.api.WxMessageDuplicateChecker;
import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker;
import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateCheckerSingleton;
import me.chanjar.weixin.common.session.InternalSession;
import me.chanjar.weixin.common.session.InternalSessionManager;
import me.chanjar.weixin.common.session.StandardSessionManager;
Expand Down Expand Up @@ -72,7 +73,7 @@ public WxMpMessageRouter(WxMpService wxMpService) {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxMpMessageRouter-pool-%d").build();
this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory);
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance();
this.sessionManager = new StandardSessionManager();
this.exceptionHandler = new LogExceptionHandler();
}
Expand All @@ -83,7 +84,7 @@ public WxMpMessageRouter(WxMpService wxMpService) {
public WxMpMessageRouter(WxMpService wxMpService, ExecutorService executorService) {
this.wxMpService = wxMpService;
this.executorService = executorService;
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance();
this.sessionManager = new StandardSessionManager();
this.exceptionHandler = new LogExceptionHandler();
}
Expand Down