diff --git a/weixin-java-common/src/main/java/me/chanjar/weixin/common/api/WxMessageInMemoryDuplicateChecker.java b/weixin-java-common/src/main/java/me/chanjar/weixin/common/api/WxMessageInMemoryDuplicateChecker.java index 465f35434b..88c3aeae69 100644 --- a/weixin-java-common/src/main/java/me/chanjar/weixin/common/api/WxMessageInMemoryDuplicateChecker.java +++ b/weixin-java-common/src/main/java/me/chanjar/weixin/common/api/WxMessageInMemoryDuplicateChecker.java @@ -8,10 +8,12 @@ *
  * 默认消息重复检查器.
  * 将每个消息id保存在内存里,每隔5秒清理已经过期的消息id,每个消息id的过期时间是15秒
+ * 替换类WxMessageInMemoryDuplicateCheckerSingleton
  * 
* * @author Daniel Qian */ +@Deprecated public class WxMessageInMemoryDuplicateChecker implements WxMessageDuplicateChecker { /** diff --git a/weixin-java-common/src/main/java/me/chanjar/weixin/common/api/WxMessageInMemoryDuplicateCheckerSingleton.java b/weixin-java-common/src/main/java/me/chanjar/weixin/common/api/WxMessageInMemoryDuplicateCheckerSingleton.java new file mode 100644 index 0000000000..f275a2badc --- /dev/null +++ b/weixin-java-common/src/main/java/me/chanjar/weixin/common/api/WxMessageInMemoryDuplicateCheckerSingleton.java @@ -0,0 +1,91 @@ +package me.chanjar.weixin.common.api; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * @author jiangby + * @version 1.0 + *

+ * 消息去重,记录消息ID首次出现时的时间戳, + * 15S后定时任务触发时废除该记录消息ID + *

+ * @date 2022/5/26 1:32 + */ +@Slf4j +public class WxMessageInMemoryDuplicateCheckerSingleton implements WxMessageDuplicateChecker { + + /** + * 一个消息ID在内存的过期时间:15秒. + */ + private static final Long TIME_TO_LIVE = 15L; + + /** + * 每隔多少周期检查消息ID是否过期:5秒. + */ + private static final Long CLEAR_PERIOD = 5L; + + /** + * 线程池 + */ + private static final ScheduledThreadPoolExecutor SCHEDULED_THREAD_POOL_EXECUTOR = new ScheduledThreadPoolExecutor(1, + new ThreadFactoryBuilder().setNameFormat("wxMessage-memory-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy()); + + /** + * 消息id->消息时间戳的map. + */ + private static final ConcurrentHashMap MSG_ID_2_TIMESTAMP = new ConcurrentHashMap<>(); + + static { + SCHEDULED_THREAD_POOL_EXECUTOR.scheduleAtFixedRate(() -> { + try { + Long now = System.currentTimeMillis(); + MSG_ID_2_TIMESTAMP.entrySet().removeIf(entry -> now - entry.getValue() > TIME_TO_LIVE * 1000); + } catch (Exception ex) { + log.error("重复消息去重任务出现异常", ex); + } + }, 1, CLEAR_PERIOD, TimeUnit.SECONDS); + } + + /** + * 私有化构造方法,避免外部调用 + */ + private WxMessageInMemoryDuplicateCheckerSingleton() { + } + + /** + * 获取单例 + * + * @return 单例对象 + */ + public static WxMessageInMemoryDuplicateCheckerSingleton getInstance() { + return WxMessageInnerClass.CHECKER_SINGLETON; + } + + /** + * 内部类实现单例 + */ + private static class WxMessageInnerClass { + static final WxMessageInMemoryDuplicateCheckerSingleton CHECKER_SINGLETON = new WxMessageInMemoryDuplicateCheckerSingleton(); + } + + /** + * messageId是否重复 + * + * @param messageId messageId + * @return 是否 + */ + @Override + public boolean isDuplicate(String messageId) { + if (messageId == null) { + return false; + } + Long timestamp = MSG_ID_2_TIMESTAMP.putIfAbsent(messageId, System.currentTimeMillis()); + return timestamp != null; + } +} diff --git a/weixin-java-common/src/test/java/me/chanjar/weixin/common/api/WxMessageInMemoryDuplicateCheckerSingletonTest.java b/weixin-java-common/src/test/java/me/chanjar/weixin/common/api/WxMessageInMemoryDuplicateCheckerSingletonTest.java new file mode 100644 index 0000000000..d6f4ba2fac --- /dev/null +++ b/weixin-java-common/src/test/java/me/chanjar/weixin/common/api/WxMessageInMemoryDuplicateCheckerSingletonTest.java @@ -0,0 +1,45 @@ +package me.chanjar.weixin.common.api; + +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +/** + * @author jiangby + * @version 1.0 + * @description: 作用 + * @date 2022/5/26 1:46 + */ +@Test +public class WxMessageInMemoryDuplicateCheckerSingletonTest { + + private static WxMessageInMemoryDuplicateCheckerSingleton checkerSingleton = WxMessageInMemoryDuplicateCheckerSingleton.getInstance(); + + public void test() throws InterruptedException { + Long[] msgIds = new Long[]{1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L}; + + // 第一次检查 + for (Long msgId : msgIds) { + boolean result = checkerSingleton.isDuplicate(String.valueOf(msgId)); + assertFalse(result); + } + + // 初始化后1S进行检查 每五秒检查一次,过期时间为15秒,过15秒再检查 + TimeUnit.SECONDS.sleep(15); + for (Long msgId : msgIds) { + boolean result = checkerSingleton.isDuplicate(String.valueOf(msgId)); + assertTrue(result); + } + + // 过6秒再检查 + TimeUnit.SECONDS.sleep(6); + for (Long msgId : msgIds) { + boolean result = checkerSingleton.isDuplicate(String.valueOf(msgId)); + assertFalse(result); + } + + } +} diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/message/WxCpMessageRouter.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/message/WxCpMessageRouter.java index b2327bdc6b..c027159bc2 100644 --- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/message/WxCpMessageRouter.java +++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/message/WxCpMessageRouter.java @@ -5,6 +5,7 @@ import me.chanjar.weixin.common.api.WxErrorExceptionHandler; import me.chanjar.weixin.common.api.WxMessageDuplicateChecker; import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker; +import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateCheckerSingleton; import me.chanjar.weixin.common.session.InternalSession; import me.chanjar.weixin.common.session.InternalSessionManager; import me.chanjar.weixin.common.session.WxSessionManager; @@ -71,7 +72,7 @@ public WxCpMessageRouter(WxCpService wxCpService) { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxCpMessageRouter-pool-%d").build(); this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory); - this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker(); + this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance(); this.sessionManager = wxCpService.getSessionManager(); this.exceptionHandler = new LogExceptionHandler(); } @@ -82,7 +83,7 @@ public WxCpMessageRouter(WxCpService wxCpService) { public WxCpMessageRouter(WxCpService wxMpService, ExecutorService executorService) { this.wxCpService = wxMpService; this.executorService = executorService; - this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker(); + this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance(); this.sessionManager = wxCpService.getSessionManager(); this.exceptionHandler = new LogExceptionHandler(); } diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouter.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouter.java index 70ad0a64d3..848f089c6b 100644 --- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouter.java +++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouter.java @@ -5,6 +5,7 @@ import me.chanjar.weixin.common.api.WxErrorExceptionHandler; import me.chanjar.weixin.common.api.WxMessageDuplicateChecker; import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker; +import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateCheckerSingleton; import me.chanjar.weixin.common.session.InternalSession; import me.chanjar.weixin.common.session.InternalSessionManager; import me.chanjar.weixin.common.session.WxSessionManager; @@ -73,7 +74,7 @@ public WxCpTpMessageRouter(WxCpTpService wxCpTpService) { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxCpTpMessageRouter-pool-%d").build(); this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory); - this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker(); + this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance(); this.sessionManager = wxCpTpService.getSessionManager(); this.exceptionHandler = new LogExceptionHandler(); } @@ -84,7 +85,7 @@ public WxCpTpMessageRouter(WxCpTpService wxCpTpService) { public WxCpTpMessageRouter(WxCpTpService wxCpTpService, ExecutorService executorService) { this.wxCpTpService = wxCpTpService; this.executorService = executorService; - this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker(); + this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance(); this.sessionManager = wxCpTpService.getSessionManager(); this.exceptionHandler = new LogExceptionHandler(); } diff --git a/weixin-java-miniapp/src/main/java/cn/binarywang/wx/miniapp/message/WxMaMessageRouter.java b/weixin-java-miniapp/src/main/java/cn/binarywang/wx/miniapp/message/WxMaMessageRouter.java index e2c497e139..a5f714edd0 100644 --- a/weixin-java-miniapp/src/main/java/cn/binarywang/wx/miniapp/message/WxMaMessageRouter.java +++ b/weixin-java-miniapp/src/main/java/cn/binarywang/wx/miniapp/message/WxMaMessageRouter.java @@ -7,6 +7,7 @@ import me.chanjar.weixin.common.api.WxErrorExceptionHandler; import me.chanjar.weixin.common.api.WxMessageDuplicateChecker; import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker; +import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateCheckerSingleton; import me.chanjar.weixin.common.session.InternalSession; import me.chanjar.weixin.common.session.InternalSessionManager; import me.chanjar.weixin.common.session.StandardSessionManager; @@ -48,7 +49,7 @@ public WxMaMessageRouter(WxMaService wxMaService) { 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory); this.sessionManager = new StandardSessionManager(); this.exceptionHandler = new LogExceptionHandler(); - this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker(); + this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance(); } /** @@ -59,7 +60,7 @@ public WxMaMessageRouter(WxMaService wxMaService, ExecutorService executorServic this.executorService = executorService; this.sessionManager = new StandardSessionManager(); this.exceptionHandler = new LogExceptionHandler(); - this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker(); + this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance(); } /** diff --git a/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java b/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java index 4a2291050c..e55e499098 100644 --- a/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java +++ b/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java @@ -6,6 +6,7 @@ import me.chanjar.weixin.common.api.WxErrorExceptionHandler; import me.chanjar.weixin.common.api.WxMessageDuplicateChecker; import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker; +import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateCheckerSingleton; import me.chanjar.weixin.common.session.InternalSession; import me.chanjar.weixin.common.session.InternalSessionManager; import me.chanjar.weixin.common.session.StandardSessionManager; @@ -72,7 +73,7 @@ public WxMpMessageRouter(WxMpService wxMpService) { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxMpMessageRouter-pool-%d").build(); this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory); - this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker(); + this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance(); this.sessionManager = new StandardSessionManager(); this.exceptionHandler = new LogExceptionHandler(); } @@ -83,7 +84,7 @@ public WxMpMessageRouter(WxMpService wxMpService) { public WxMpMessageRouter(WxMpService wxMpService, ExecutorService executorService) { this.wxMpService = wxMpService; this.executorService = executorService; - this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker(); + this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance(); this.sessionManager = new StandardSessionManager(); this.exceptionHandler = new LogExceptionHandler(); }