Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

增加企业微信路由线程池关闭方法 #2583

Merged
merged 3 commits into from
Apr 11, 2022
Merged

增加企业微信路由线程池关闭方法 #2583

merged 3 commits into from
Apr 11, 2022

Conversation

nadirvishun
Copy link

@nadirvishun nadirvishun commented Apr 11, 2022

Tomcat 不能 graceful shutdown中只提供了微信公众号的方法,这次是增加企业微信相关方法
Fixes #535

@WangDada2016
Copy link

请问shutDownExecutorService这个方法是手动调用,还是每次http关闭时会自动调用该方法?

@0katekate0
Copy link

请问下,这个shutDownExecutorService这个方法应该在哪里去调用?

@nadirvishun
Copy link
Author

我这边是整个spring项目关闭时才需要手动调用执行,而不是单个http请求就要关闭,而且是只有优雅关闭时才需要,如果强制结束进程调不调用都无所谓。处理的方式是用@PreDestroy来实现的,以提供的demo中的类为例子:

    /**
     * 结束时关闭路由线程池
     */
    @PreDestroy
    public void destroy() {
        messageRouter.shutDownExecutorService(5);
    }
    /**
     * 结束时关闭线程池
     */
    @PreDestroy
    public void destroy() {
        List<WxCpMessageRouter> messageRouterList = new ArrayList<>(routers.values());
        if (messageRouterList.size() > 0) {
            for (WxCpMessageRouter messageRouter : messageRouterList) {
                messageRouter.shutDownExecutorService(5);
            }
        }
    }

@WangDada2016
Copy link

如果是整个spring项目关闭时才需要手动调用执行,并不能解决因为jvm的设置导致创建线程过多自动宕机的。以下是我的错误日志。
Current thread (0x00007f8c3d987800): JavaThread "WxCpTpMessageRouter-pool-1" [_thread_new, id=890402, stack(0x00007f87ccec4000,0x00007f87ccf45000)]

Stack: [0x00007f87ccec4000,0x00007f87ccf45000], sp=0x00007f87ccf437a0, free space=509k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
V [libjvm.so+0xad21aa] VMError::report_and_die()+0x2ba
V [libjvm.so+0x5022db] report_vm_out_of_memory(char const*, int, unsigned long, VMErrorType, char const*)+0x8b
V [libjvm.so+0x92fe33] os::Linux::commit_memory_impl(char*, unsigned long, bool)+0x123
V [libjvm.so+0x92fefc] os::pd_commit_memory(char*, unsigned long, bool)+0xc
V [libjvm.so+0x927f8a] os::commit_memory(char*, unsigned long, bool)+0x2a
V [libjvm.so+0x92bc4f] os::pd_create_stack_guard_pages(char*, unsigned long)+0x7f
V [libjvm.so+0xa771ce] JavaThread::create_stack_guard_pages()+0x5e
V [libjvm.so+0xa81454] JavaThread::run()+0x34
V [libjvm.so+0x930198] java_start(Thread*)+0x108
C [libpthread.so.0+0x7ea5] start_thread+0xc5

--------------- P R O C E S S ---------------

Java Threads: ( => current thread )
=>0x00007f8c3d987800 JavaThread "WxCpTpMessageRouter-pool-1" [_thread_new, id=890402, stack(0x00007f87ccec4000,0x00007f87ccf45000)]
0x00007f8c3d985800 JavaThread "WxCpTpMessageRouter-pool-0" [_thread_blocked, id=890401, stack(0x00007f87ccf45000,0x00007f87ccfc6000)]
0x00007f8c3d983800 JavaThread "Thread-10870" daemon [_thread_blocked, id=890400, stack(0x00007f87ccfc6000,0x00007f87cd047000)]
0x00007f8c4cd83000 JavaThread "Keep-Alive-Timer" daemon [_thread_blocked, id=890378, stack(0x00007f87e02a7000,0x00007f87e0328000)]
0x00007f8c3df83000 JavaThread "WxCpTpMessageRouter-pool-1" [_thread_blocked, id=889734, stack(0x00007f87cd047000,0x00007f87cd0c8000)]
0x00007f8c3df80800 JavaThread "WxCpTpMessageRouter-pool-0" [_thread_blocked, id=889733, stack(0x00007f87cd0c8000,0x00007f87cd149000)]
0x00007f8c3df7e800 JavaThread "Thread-10869" daemon [_thread_blocked, id=889732, stack(0x00007f87cd149000,0x00007f87cd1ca000)]
0x00007f8c3291e800 JavaThread "WxCpTpMessageRouter-pool-1" [_thread_blocked, id=889731, stack(0x00007f87cd1ca000,0x00007f87cd24b000)]
0x00007f8c224fc000 JavaThread "WxCpTpMessageRouter-pool-1" [_thread_blocked, id=889730, stack(0x00007f87cd24b000,0x00007f87cd2cc000)]
0x00007f8c2cea1000 JavaThread "WxCpTpMessageRouter-pool-1" [_thread_blocked, id=889729, stack(0x00007f87cd2cc000,0x00007f87cd34d000)]
0x00007f8c2ce9f000 JavaThread "WxCpTpMessageRouter-pool-0" [_thread_blocked, id=889728, stack(0x00007f87cd34d000,0x00007f87cd3ce000)]
0x00007f8c2ce9d000 JavaThread "Thread-10868" daemon [_thread_blocked, id=889727, stack(0x00007f87cd3ce000,0x00007f87cd44f000)]
0x00007f8c4cd9a800 JavaThread "WxCpTpMessageRouter-pool-1" [_thread_blocked, id=889726, stack(0x00007f87cd44f000,0x00007f87cd4d0000)]
0x00007f8c4cd98800 JavaThread "WxCpTpMessageRouter-pool-0" [_thread_blocked, id=889725, stack(0x00007f87cd4d0000,0x00007f87cd551000)]
0x00007f8c4cd96800 JavaThread "Thread-10867" daemon [_thread_blocked, id=889724, stack(0x00007f87cd551000,0x00007f87cd5d2000)]
0x00007f8c224fa000 JavaThread "WxCpTpMessageRouter-pool-0" [_thread_blocked, id=889723, stack(0x00007f87cd5d2000,0x00007f87cd653000)]
0x00007f8c224f8000 JavaThread "Thread-10866" daemon [_thread_blocked, id=889722, stack(0x00007f87cd653000,0x00007f87cd6d4000)]
0x00007f8c3291c800 JavaThread "WxCpTpMessageRouter-pool-0" [_thread_blocked, id=889721, stack(0x00007f87cd6d4000,0x00007f87cd755000)]
0x00007f8c3291a800 JavaThread "Thread-10865" daemon [_thread_blocked, id=889720, stack(0x00007f87cd755000,0x00007f87cd7d6000)]
0x00007f8c111c0000 JavaThread "WxCpTpMessageRouter-pool-1" [_thread_blocked, id=889719, stack(0x00007f87cd7d6000,0x00007f87cd857000)]
0x00007f8c51172800 JavaThread "WxCpTpMessageRouter-pool-1" [_thread_blocked, id=889718, stack(0x00007f87cd857000,0x00007f87cd8d8000)]
0x00007f8c111be000 JavaThread "WxCpTpMessageRouter-pool-0" [_thread_blocked, id=889717, stack(0x00007f87cd8d8000,0x00007f87cd959000)]
0x00007f8c111bc000 JavaThread "Thread-10864" daemon [_thread_blocked, id=889716, stack(0x00007f87cd959000,0x00007f87cd9da000)]
0x00007f8c51170800 JavaThread "WxCpTpMessageRouter-pool-0" [_thread_blocked, id=889715, stack(0x00007f87cd9da000,0x00007f87cda5b000)]
0x00007f8c5116e800 JavaThread "Thread-10863" daemon [_thread_blocked, id=889714, stack(0x00007f87cda5b000,0x00007f87cdadc000)]
0x00007f8c3df7c800 JavaThread "WxCpTpMessageRouter-pool-1" [_thread_blocked, id=889032, stack(0x00007f87cdadc000,0x00007f87cdb5d000)]
0x00007f8c3d1c8000 JavaThread "WxCpTpMessageRouter-pool-0" [_thread_blocked, id=889031, stack(0x00007f87cdb5d000,0x00007f87cdbde000)]

@WangDada2016
Copy link

WangDada2016 commented Apr 24, 2022

目前我将shutDownExecutorService方法加在
private WxMpXmlOutMessage route(WxMpXmlMessage message) {
try {
return this.messageRouter.route(message);
} catch (Exception e) {
log.error("路由消息时出现异常!", e);
}finally {
this.messageRouter.shutDownExecutorService();
}
return null;
}

@nadirvishun
Copy link
Author

目前我将shutDownExecutorService方法加在 private WxMpXmlOutMessage route(WxMpXmlMessage message) { try { return this.messageRouter.route(message); } catch (Exception e) { log.error("路由消息时出现异常!", e); }finally { this.messageRouter.shutDownExecutorService(); } return null; }

这样不行吧?如果放到这里关闭了,下次再来一个消息怎么办?你在哪里重新创建线程池的?
创建线程过多导致宕机个人感觉可以从这两个方面入手:

  1. 自定义线程池,不要用系统自己默认的,合理设置开启的线程数和设置有界的队列和拒绝策略,可能会减轻消耗。
  2. 参考微信公众号用threadlocal来存储相关,将路由这个弄成单例的,这样就只有一个线程池而不会创建很多个。

@WangDada2016
Copy link

每次收到一个事件推送就会创建一次呀
protected static WxCpTpMessageRouter newRouter(WxCpTpService wxCpTpService) {
final val newRouter = new WxCpTpMessageRouter(wxCpTpService);
}
public WxCpTpMessageRouter(WxCpTpService wxCpTpService) {
this.wxCpTpService = wxCpTpService;
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxCpTpMessageRouter-pool-%d").build();
this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory);
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
this.sessionManager = wxCpTpService.getSessionManager();
this.exceptionHandler = new LogExceptionHandler();
}

@nadirvishun
Copy link
Author

demo中都是先建好存起来,用的时候直接调用,如果你那边能保证能新建,逻辑上倒是没什么问题,但是线程池的目的就是为了线程复用,避免线程创建销毁带来的性能消耗的。如果这样频繁创新关闭。。。。
实在不行可以看下你程序中真的有用到异步路由吗?如果没有用可以去掉相关的异步路由,我看代码中只有异步路由才会走线程池,所以去掉后就不会存在线程池的占用了。

@WangDada2016
Copy link

你可以debug看下,每次请求有没有新建的,我目前没有用到异步路由,也是有创建的

@nadirvishun
Copy link
Author

你可以debug看下,每次请求有没有新建的,我目前没有用到异步路由,也是有创建的

是说路由的新建吗?这个官方只是给了个demo,实际上每个人的实现可都不一样,如果是按照weixin-java-cp-demo来的话(这是企业微信自建应用的,没找到企业微信第三方的demo),每次来消息并不需要要新建啊。
然后同样是这个官方demo中有用到异步路由:

private WxCpMessageRouter newRouter(WxCpService wxCpService) {
    final val newRouter = new WxCpMessageRouter(wxCpService);
    // 记录所有事件的日志 (异步执行)
    newRouter.rule().handler(this.logHandler).next();
    ...
}

@WangDada2016
Copy link

WangDada2016 commented Apr 24, 2022

自建和第三方实现都是一样的,你可以把断点打在源码上,然后模拟请求发送,看源码的执行。
现在我已经改为自定义线程池了。
核心线程 8 最大线程 20 保活时间30s 存储队列 10 有守护线程 拒绝策略:将超负荷任务回退到调用者
默认使用核心线程(8)数执行任务,任务数量超过核心线程数就丢到队列,队列(10)满了就再开启新的线程,新的线程数最大为20,当任务执行完,新开启的线程将存活30s,若没有任务就消亡,线程池回到核心线程数量
private static final ExecutorService executor = new ThreadPoolExecutor(8, 20,
30L, TimeUnit.SECONDS, new LinkedBlockingQueue(10),
new ThreadFactoryBuilder().setNameFormat("myExecutor-WxCpTpMessageRouter-pool--%d").setDaemon(true).build(),
new ThreadPoolExecutor.CallerRunsPolicy());

@nadirvishun
Copy link
Author

我实在是没看出来哪里需要新建啊,不是在WxCpConfiguration中初始化时就按照应用来把路由创建好了,然后存到map中了吗?还有如果没有异步路由压根不会走线程池啊,不如你好好对比下和demo是否一致,或者是贴一下你写的配置类WxCpConfiguration 和入口类WxPortalController的代码看下啊。

@WangDada2016
Copy link

断点打在WxCpTpMessageRouter源码的构造方法上
1、public WxCpTpMessageRouter(WxCpTpService wxCpTpService) {
this.wxCpTpService = wxCpTpService;
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxCpTpMessageRouter-pool-%d").build();
this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory);
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
this.sessionManager = wxCpTpService.getSessionManager();
this.exceptionHandler = new LogExceptionHandler();
}
2、 public WxCpXmlOutMessage route(final WxCpTpXmlMessage wxMessage, final Map<String, Object> context)
WxCpXmlOutMessage res = null;
final List futures = new ArrayList<>();
for (final WxCpTpMessageRouterRule rule : matchRules) {
// 返回最后一个非异步的rule的执行结果
if (rule.isAsync()) {
futures.add(
this.executorService.submit(() -> {
rule.service(wxMessage, context, WxCpTpMessageRouter.this.wxCpTpService, WxCpTpMessageRouter.this.sessionManager, WxCpTpMessageRouter.this.exceptionHandler);
})
);
其中rule.isAsync()设置的是false,在源码执行还是true,会进到线程池里面

@nadirvishun
Copy link
Author

对于第一个,如果你是按照demo来,这个只有在WxCpConfigurationinitServices方法按照应用个数来执行一次,然后存起来复用,而不是来一个消息就执行一次。
对于的二个,rule.isAsync()如果是false,为什么在源码中还是true啊,怎么可能啊?是不是有好多个rules,其中有一个是true啊,是true的那个就是异步的,如果对你的逻辑没用的话可以注释掉,这样就不走线程池了。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Tomcat 不能 graceful shutdown
4 participants