Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer端线程池的实现非常不科学 #7054

Closed
wikiwikiwiki opened this issue Dec 25, 2020 · 19 comments
Closed

consumer端线程池的实现非常不科学 #7054

wikiwikiwiki opened this issue Dec 25, 2020 · 19 comments
Milestone

Comments

@wikiwikiwiki
Copy link

wikiwikiwiki commented Dec 25, 2020

  • Dubbo version: 2.7.8

先说结论吧,目前的实现问题非常大,对于一个默认配置的consumer,只要运行一段时间,consumer应用就会有n个线程数为200的FixedThreadPool,n是应用依赖的所有provider的不同端口的数量Set(provider.getPort()).size(),并且每一个provider下线都会导致关闭和重建一个FixedThreadPool。

详细解释一下,我们假设consumer端所有配置为默认,那么会有以下问题:

  1. 对不同的provider,consumer会通过一个map共用线程池,其中key是provider的端口。所以如果provider有多个端口,还是会使用多个线程池,当然仅仅是这个问题的话影响还不算大。
    但是考虑有100个provider,他们的端口全部相同,那么每个provider下线的时候,AbstractClient.close()方法会被调用,线程池会被关闭... 而dubbo有默认端口,在实际场景下大部分provider端口都一致,其中只要有一个下线,整个线程池就会关闭。
    所以在实际场景中,consumer使用的线程池一直在不断的关闭,不断的新建...

  2. 前面这个问题已经很大了,但这没有结束,它还会导致另外一个大问题...
    AbstractClient.initExecutor()方法会设置线程池的名字和类型,默认名字为DubboClientHandler,类型为cache,看起来都是CachedThreadPool。但实际上initExecutor方法调用在wrapChannelHandler方法之后,Dispatcher.dispatch()方法的url参数并没有设置为cache,所以对于同一个端口,最开始会是一个名字为DubboClientHandler的CachedThreadPool,但是因为上一个问题的存在,一台provider下线就会导致这个线程池被关闭。
    然后consumer继续调用provider,这时候会调用WrappedChannelHandler.getExecutorService()方法,因为原线程池被关闭,会直接新建一个线程池,但是因为WrappedChannelHandler里的url没有被设置过,重建的线程池会是一个名叫Dubbo的200个线程的FixedThreadPool... 并且这个FixedThreadPool还在一直随着provider下线被关闭和重建...

所以最终结果就是一堆200个线程的FixedThreadPool在不断关闭与重建......

@stonelion
Copy link
Contributor

#7044 可以通过设置 consumer 默认线程池为 cached 解决。

@wikiwikiwiki
Copy link
Author

wikiwikiwiki commented Dec 28, 2020

#7044 可以通过设置 consumer 默认线程池为 cached 解决。

看了一下你的pr,感觉你把这个问题想得太简单了,这不是一个简单给个默认配置的问题,是从设计上就有很多问题。

  1. 线程池以port为单位共享,这个共享粒度模糊不清。
    线程池的共享以provider的port作为key,这个地方问题就很大。一个consumer可能依赖不同service,不同的service的port可能一致也可能不一致;一个service里面provider的port可能一致也可能不一致。用port做key感觉不出任何道理。
    举个例子,一个服务可能依赖100个服务,其中99个服务端口都是20880,一个服务是20881,所以99个服务共享一个线程池,1个服务单独使用线程池,这不太科学。
    上面这个例子再改一下,100个服务中99个服务端口都是20880,剩余那个有9个provider,端口分别从20881到20889,所以99个服务共享一个线程池,某个服务单独9个线程池,更不科学了。
    所以,为什么是port为共享的粒度?感觉不出什么道理。再看一下,默认情况都是CachedThreadPool,那么其实很明显,默认情况下consumer端只需要有一个共享的CachedThreadPool就好,多个CachedThreadPool看不出任何好处。

  2. 共享线程池的创建有问题。
    因为线程池以port为单位,那么又会引发另外的问题,consumer端线程池没办法隔离,配置的参数可能不生效。
    如果我们想对某个服务单独配置一个线程池,我们可以在dubbo:reference里面配置<dubbo:parameter key="threadpool" value="fixed"/>,如果只是一个简单的demo,那这个配置确实能生效。但是实际上如果还有一个dubbo:reference的provider端口一样,那么它们会共享线程池,并且创建出来的线程池是FixedThreadPool还是CachedThreadPool,完全取决于哪一个reference先加载。

  3. 共享线程池的关闭有问题
    这个问题在这个issue里面已经说过了,对于n个同端口的provider,不管是不是属于同一个service,只要有一个下线(不用意外宕机),整个线程池都关闭然后再重建一次,不管重建的是CachedThreadPool还是FixedThreadPool,这显然都不应该。

所以说这里不止是一个默认配置的问题,你的改动是针对有问题的设计下错误实现的修改,即使改了,也没有解决根本问题。

说一下感觉应该改的地方:

  1. consumer端默认情况下应该就一个共享CachedThreadPool线程池实例。作为一个CachedThreadPool,在实际场景中其实看不出线程池有啥关闭的需要,DubboBootStrap在destroy的时候关闭下就好了。
  2. 看不出现在的设计到底有没有想在consumer端做线程池隔离(说不做又提供了隔离的方式,说做这隔离的方式又非常不靠谱...),如果设计意图是不需要,那整个consumer端只需要一个CachedThreadPool就行了;如果需要,那应该提供一个类似provider端protocol那样的东西,以这个东西为单位来做线程池隔离,并且提供参数指明没有provider时是否需要关闭,如果需要,用引用计数做关闭。

@zhongxiongzeng
Copy link

我觉得大多数应用的provider暴露的端口都是一样的,就等于是在共享同一个线程池了,共享线程池的方案还不错。
如果要做隔离的话是不是可以考虑用provider的host作为key好一点。

@wikiwikiwiki
Copy link
Author

wikiwikiwiki commented Jan 4, 2021

我觉得大多数应用的provider暴露的端口都是一样的,就等于是在共享同一个线程池了,共享线程池的方案还不错。
如果要做隔离的话是不是可以考虑用provider的host作为key好一点。

大多数端口一样只是简单环境的情况,要真碰到复杂环境就出问题了,线上因为这个出问题怪大家自己配置端口这肯定不行啊。

隔离用provider host做key没啥意义,不管是针对接口还是方法或是其它,都应该是针对一个服务的所有机器,一个服务可能100台机器,应该是针对这个服务做线程隔离,针对其中某台没什么用。

@zhangyz-hd
Copy link
Contributor

zhangyz-hd commented Jan 5, 2021

这个问题的核心,应该是:当某个提供者下线时,消费者端创建一个fixed的线程池,创建的时的堆栈如下

	at org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.disconnected(AllChannelHandler.java:50)
	at org.apache.dubbo.remoting.exchange.support.header.HeartbeatHandler.disconnected(HeartbeatHandler.java:55)
	at org.apache.dubbo.remoting.transport.AbstractChannelHandlerDelegate.disconnected(AbstractChannelHandlerDelegate.java:48)
	at org.apache.dubbo.remoting.transport.AbstractPeer.disconnected(AbstractPeer.java:131)
	at org.apache.dubbo.remoting.transport.netty4.NettyClientHandler.channelInactive(NettyClientHandler.java:70)

看AllChannelHandler.disconnected中,目的应该是要一个共享的pool把断连后的ChannelEventRunnable任务完成

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();//<------
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

但是在DefaultExecutorRepository.getExecutor中,因为executor已经shutdown状态,所以if分支内又执行了一次executor = createExecutor(url);而这次创建的pool,是fixed类型,因为URL中的portKey没变化,因此覆盖了AbstractClient创建的cached池

        if (executor != null) {
            if (executor.isShutdown() || executor.isTerminated()) {
                executors.remove(portKey);
                executor = createExecutor(url);//<------
                executors.put(portKey, executor);
            }
        }

我觉得不一定要修复consumer端的线程模型,只需要修复AllChannelHandler.disconnected的问题,比如AllChannelHandler中ExecutorService固定为自身专用的池,或者改为全局共享的一个池。

@zhangyz-hd
Copy link
Contributor

@wikiwikiwiki see #7081

@xiaoheng1
Copy link
Contributor

xiaoheng1 commented Jan 11, 2021

@zhangyz-hd 当 provider 下线的时候,会重刷 invokers. 此时会销毁无用的 invoker,这是会导致客户端线程池被销毁吧?

并且现在的争论点并不是线程池的问题,而是是否应该销毁,创建,销毁这么一个过程。客户端线程池设计是否合理的问题。

@zhangyz-hd
Copy link
Contributor

@zhangyz-hd 当 provider 下线的时候,会重刷 invokers. 此时会销毁无用的 invoker,这是会导致客户端线程池被销毁吧?

并且现在的争论点并不是线程池的问题,而是是否应该销毁,创建,销毁这么一个过程。客户端线程池设计是否合理的问题。

对,我这里的线程池模型的确存在值得讨论的地方。
同时#7081先撤回,遗漏了一种场景。

@chickenlj
Copy link
Contributor

chickenlj commented Jan 15, 2021

目前consumer端的线程池在某些场景是会有问题的,在高并发场景下会创建大量的线程池。其中,2.7 和 2.6 版本分别对不同的调用场景做了优化,我认为两者可以合并到一起:

2.7版本:
将消费端 Executor 逻辑统一到了这个位置:
https://github.com/apache/dubbo/blob/master/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java#L187

https://github.com/apache/dubbo/blob/master/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java#L109

  • 同步调用,通过 ThreadlessExecutor 复用业务线程池,从io线程直接到业务线程
  • 异步调用,目前行为未变 ”io- > 线程池 -> 业务线程“,逻辑上有调整,统一到了ExecutorRepository 中,可以参考 2.6中改动,增加全局共享线程池配置

2.6版本:
2.6 版本中做了全局共享 consumer 线程池策略(同步&异步),避免线程数创建过多 #5891

AbstractClient.close()方法会被调用,线程池会被关闭... 而dubbo有默认端口,在实际场景下大部分provider端口都一致,其中只要有一个下线,整个线程池就会关闭。
所以在实际场景中,consumer使用的线程池一直在不断的关闭

@wikiwikiwiki 这个小点是不成立的,AbstractClient是共享的且有reference计数的,只有当所有的 provider 都下线时,AbstractClient实例才会被关闭

@chickenlj chickenlj added this to the 2.7.10 milestone Jan 15, 2021
@zhangyz-hd
Copy link
Contributor

@wikiwikiwiki @xiaoheng1
我尝试按消费者使用1个线程池的思路修改了一下,请验证下是否能解决
fix_7054_again

@wikiwikiwiki
Copy link
Author

wikiwikiwiki commented Jan 15, 2021

目前consumer端的线程池在某些场景是会有问题的,在高并发场景下会创建大量的线程池。其中,2.7 和 2.6 版本分别对不同的调用场景做了优化,我认为两者可以合并到一起:

2.7版本:
将消费端 Executor 逻辑统一到了这个位置:
https://github.com/apache/dubbo/blob/master/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java#L187

https://github.com/apache/dubbo/blob/master/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java#L109

  • 同步调用,通过 ThreadlessExecutor 复用业务线程池,从io线程直接到业务线程
  • 异步调用,目前行为未变 ”io- > 线程池 -> 业务线程“,逻辑上有调整,统一到了ExecutorRepository 中,可以参考 2.6中改动,增加全局共享线程池配置

2.6版本:
2.6 版本中做了全局共享 consumer 线程池策略(同步&异步),避免线程数创建过多 #5891

AbstractClient.close()方法会被调用,线程池会被关闭... 而dubbo有默认端口,在实际场景下大部分provider端口都一致,其中只要有一个下线,整个线程池就会关闭。
所以在实际场景中,consumer使用的线程池一直在不断的关闭

@wikiwikiwiki 这个小点是不成立的,AbstractClient是共享的且有reference计数的,只有当所有的 provider 都下线时,AbstractClient实例才会被关闭

这个小点是成立的啊... AbstractClient有引用计数,但是这个计数的key是url.getAddress(),也就是ip:port,共享相关代码我贴上来了。举个例子,一个服务三台provider { 192.168.1.2:8080, 192.168.1.3:8080, 192.168.1.4:8080 },consumer端建立的三个client不会共享,但是port完全相同,而线程池共享的单位为port也就是同一个线程池,结果就是任意一台下线都会关闭线程池导致重建。

private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
        String key = url.getAddress();
        List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);

2.6这个设置确实没注意到,感觉这应该是默认设置,默认一个共享。
2.7的同步线程池模型确实改了,但是还有事件处理和异步需要用到线程池,所以影响还在。
然后既然要改,这次是不是可以一次改好,提供dubbo:consumerthreadpool这样的东西,default是一个CachedThreadPool,想做隔离的可以自己配置。

@wikiwikiwiki
Copy link
Author

wikiwikiwiki commented Jan 15, 2021

@wikiwikiwiki @xiaoheng1
我尝试按消费者使用1个线程池的思路修改了一下,请验证下是否能解决
fix_7054_again

看了一下,感觉如果只是改成consumer端强制只使用一个CacheThreadPool逻辑上应该没啥问题,不过DubboBootstrap destroy的时候也应该关闭

@zhangyz-hd
Copy link
Contributor

zhangyz-hd commented Jan 15, 2021

@wikiwikiwiki @xiaoheng1
我尝试按消费者使用1个线程池的思路修改了一下,请验证下是否能解决
fix_7054_again

看了一下,感觉如果只是改成consumer端强制只使用一个CacheThreadPool逻辑上应该没啥问题,不过DubboBootstrap destroy的时候也应该关闭

已尝试在DubboBootstrap destroy中增加ExecutorRepository销毁。fix_7054_again

@kevinw66
Copy link
Contributor

@wikiwikiwiki @xiaoheng1
我尝试按消费者使用1个线程池的思路修改了一下,请验证下是否能解决
fix_7054_again

看了一下,感觉如果只是改成consumer端强制只使用一个CacheThreadPool逻辑上应该没啥问题,不过DubboBootstrap destroy的时候也应该关闭

已尝试在DubboBootstrap destroy中增加ExecutorRepository销毁。fix_7054_again

@zhangyz-hd 可以测试一下,然后提个PR

@zhangyz-hd
Copy link
Contributor

其实我觉得,DUBBO2.5、2.6下消费者端一个AbstractClient一个pool也这么多年了,要不把这种1:1的也保留下?

@xiaoheng1
Copy link
Contributor

我觉得这个做成配置的会更好,交由用户自行决定是否共享线程池。默认情况下共享。可以配置隔离线程池。

@zhangyz-hd
Copy link
Contributor

我觉得这个做成配置的会更好,交由用户自行决定是否共享线程池。默认情况下共享。可以配置隔离线程池。

比如这样?share.threadpool配置也加上了

@xiaoheng1
Copy link
Contributor

@zhangyz-hd 看了下逻辑,使用 ip+port 做隔离逻辑上没啥问题。

@fingthinking
Copy link

在我司的版本中将consumer端的线程池改成了公用的线程池,线上平稳运行2年+

public class SharedClientThreadPool implements ClientThreadPool {

    private static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler-Shared";

    private static final UnClosableExecutorService executor = new UnClosableExecutorService(Constants.DEFAULT_CORE_THREADS, Integer.MAX_VALUE,
            Constants.DEFAULT_ALIVE, TimeUnit.MILLISECONDS,
            new SynchronousQueue<>(),
            new NamedThreadFactory(CLIENT_THREAD_POOL_NAME, true), new AbortPolicyWithReport(CLIENT_THREAD_POOL_NAME, null));

    public Executor getExecutor(URL url) {
        return executor;
    }

}

public class UnClosableExecutorService implements ExecutorService {

    private ExecutorService executorService;

    public UnClosableExecutorService(int corePoolSize,
                                     int maximumPoolSize,
                                     long keepAliveTime,
                                     TimeUnit unit,
                                     BlockingQueue<Runnable> workQueue,
                                     ThreadFactory threadFactory,
                                     RejectedExecutionHandler handler) {
        executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    public void shutdown() {
    }

    @Override
    public List<Runnable> shutdownNow() {
        return new ArrayList<>();
    }

    @Override
    public boolean isShutdown() {
        return executorService.isShutdown();
    }

    @Override
    public boolean isTerminated() {
        return executorService.isTerminated();
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        throw new InterruptedException();
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return executorService.submit(task);
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return executorService.submit(task, result);
    }

    @Override
    public Future<?> submit(Runnable task) {
        return executorService.submit(task);
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        return executorService.invokeAll(tasks);
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return executorService.invokeAll(tasks, timeout, unit);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return executorService.invokeAny(tasks);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return executorService.invokeAny(tasks, timeout, unit);
    }

    @Override
    public void execute(Runnable command) {
        executorService.execute(command);
    }
}

vio-lin pushed a commit to vio-lin/incubator-dubbo that referenced this issue Jun 2, 2023
fixes apache#7054

(cherry picked from commit 2dadaf8)

# Conflicts:
#	dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java
#	dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants