From 2e0d7e96b6fc350128139b3b789d9976b6543f8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E8=BF=9C=E5=BE=81hd?= Date: Tue, 1 Jun 2021 17:53:19 +0800 Subject: [PATCH 1/3] cherrypick --- .../manager/DefaultExecutorRepository.java | 66 +++++++++++-------- .../manager/ExecutorRepository.java | 4 ++ .../manager/ExecutorRepositoryTest.java | 2 - .../config/bootstrap/DubboBootstrap.java | 6 +- .../remoting/transport/AbstractClient.java | 14 ++-- .../transport/netty/ThreadNameTest.java | 2 +- 6 files changed, 53 insertions(+), 41 deletions(-) diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java index 02bd6c977b8..ce88b4e210c 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java @@ -16,6 +16,15 @@ */ package org.apache.dubbo.common.threadpool.manager; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.extension.ExtensionLoader; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory; +import org.apache.dubbo.common.threadpool.ThreadPool; +import org.apache.dubbo.common.utils.ExecutorUtil; +import org.apache.dubbo.common.utils.NamedThreadFactory; + import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -26,16 +35,9 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.extension.ExtensionLoader; -import org.apache.dubbo.common.logger.Logger; -import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory; -import org.apache.dubbo.common.threadpool.ThreadPool; -import org.apache.dubbo.common.utils.NamedThreadFactory; - import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE; import static org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY; import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY; /** @@ -94,12 +96,9 @@ public DefaultExecutorRepository() { * @return */ public synchronized ExecutorService createExecutorIfAbsent(URL url) { - String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY; - if (CONSUMER_SIDE.equalsIgnoreCase(url.getSide())) { - componentKey = CONSUMER_SIDE; - } - Map executors = data.computeIfAbsent(componentKey, k -> new ConcurrentHashMap<>()); - Integer portKey = url.getPort(); + Map executors = data.computeIfAbsent(EXECUTOR_SERVICE_COMPONENT_KEY, k -> new ConcurrentHashMap<>()); + //issue-7054:Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol. + Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort(); ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url)); // If executor has been shut down, create a new one if (executor.isShutdown() || executor.isTerminated()) { @@ -111,11 +110,7 @@ public synchronized ExecutorService createExecutorIfAbsent(URL url) { } public ExecutorService getExecutor(URL url) { - String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY; - if (CONSUMER_SIDE.equalsIgnoreCase(url.getSide())) { - componentKey = CONSUMER_SIDE; - } - Map executors = data.get(componentKey); + Map executors = data.get(EXECUTOR_SERVICE_COMPONENT_KEY); /** * It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already @@ -127,16 +122,20 @@ public ExecutorService getExecutor(URL url) { return null; } - Integer portKey = url.getPort(); + //issue-7054:Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol. + Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort(); ExecutorService executor = executors.get(portKey); - if (executor != null) { - if (executor.isShutdown() || executor.isTerminated()) { - executors.remove(portKey); - executor = createExecutor(url); - executors.put(portKey, executor); - } + if (executor != null && (executor.isShutdown() || executor.isTerminated())) { + executors.remove(portKey); + // Does not re-create a shutdown executor, use SHARED_EXECUTOR for downgrade. + executor = null; + logger.info("Executor for " + url + " is shutdown."); + } + if (executor == null) { + return SHARED_EXECUTOR; + } else { + return executor; } - return executor; } @Override @@ -209,4 +208,17 @@ private ExecutorService createExecutor(URL url) { public ExecutorService getPoolRouterExecutor() { return poolRouterExecutor; } + + @Override + public void destroyAll() { + data.values().forEach(executors -> { + if (executors != null) { + executors.values().forEach(executor -> { + if (executor != null && !executor.isShutdown()) { + ExecutorUtil.shutdownNow(executor, 100); + } + }); + } + }); + } } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java index 0a8b145ca86..8e4ce873202 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java @@ -79,4 +79,8 @@ public interface ExecutorRepository { ExecutorService getPoolRouterExecutor(); + /** + * Destroy all executors that are not in shutdown state + */ + void destroyAll(); } diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java index bb3fe4ea97b..b19eb372c01 100644 --- a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java +++ b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java @@ -39,8 +39,6 @@ public void testGetExecutor() { } private void testGet(URL url) { - Assertions.assertNull(executorRepository.getExecutor(url)); - ExecutorService executorService = executorRepository.createExecutorIfAbsent(url); executorService.shutdown(); executorService = executorRepository.createExecutorIfAbsent(url); diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java index ea9d8f83757..62de4214a43 100644 --- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java +++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java @@ -1225,7 +1225,7 @@ public void destroy() { destroyRegistries(); destroyServiceDiscoveries(); - + destroyExecutorRepository(); clear(); shutdown(); release(); @@ -1238,6 +1238,10 @@ public void destroy() { } } + private void destroyExecutorRepository() { + ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().destroyAll(); + } + private void destroyRegistries() { AbstractRegistryFactory.destroyAll(); } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java index f4ae30980f6..2da53771ce1 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java @@ -38,6 +38,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL; import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY; /** * AbstractClient @@ -48,6 +49,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class); private final Lock connectLock = new ReentrantLock(); private final boolean needReconnect; + //issue-7054:Consumer's executor is sharing globally. protected volatile ExecutorService executor; private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension(); @@ -90,7 +92,8 @@ public AbstractClient(URL url, ChannelHandler handler) throws RemotingException } private void initExecutor(URL url) { - url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME); + //issue-7054:Consumer's executor is sharing globally, thread name not require provider ip. + url = url.addParameter(THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME); url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL); executor = executorRepository.createExecutorIfAbsent(url); } @@ -277,14 +280,6 @@ public void close() { logger.warn(e.getMessage(), e); } - try { - if (executor != null) { - ExecutorUtil.shutdownNow(executor, 100); - } - } catch (Throwable e) { - logger.warn(e.getMessage(), e); - } - try { disconnect(); } catch (Throwable e) { @@ -304,7 +299,6 @@ public void close() { @Override public void close(int timeout) { - ExecutorUtil.gracefulShutdown(executor, timeout); close(); } diff --git a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ThreadNameTest.java b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ThreadNameTest.java index ef3b23a8d60..7c1b310cbde 100644 --- a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ThreadNameTest.java +++ b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ThreadNameTest.java @@ -43,7 +43,7 @@ public class ThreadNameTest { private ThreadNameVerifyHandler clientHandler; private static String serverRegex = "DubboServerHandler\\-localhost:(\\d+)\\-thread\\-(\\d+)"; - private static String clientRegex = "DubboClientHandler\\-localhost:(\\d+)\\-thread\\-(\\d+)"; + private static String clientRegex = "DubboClientHandler\\-thread\\-(\\d+)"; private final CountDownLatch serverLatch = new CountDownLatch(1); private final CountDownLatch clientLatch = new CountDownLatch(1); From 9924460ccfd44768c127cd91215fae2f95e02269 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E8=BF=9C=E5=BE=81hd?= Date: Tue, 1 Jun 2021 18:06:08 +0800 Subject: [PATCH 2/3] remove unused import --- .../java/org/apache/dubbo/remoting/transport/AbstractClient.java | 1 - 1 file changed, 1 deletion(-) diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java index 2da53771ce1..f263fd021bf 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java @@ -22,7 +22,6 @@ import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.threadpool.manager.ExecutorRepository; -import org.apache.dubbo.common.utils.ExecutorUtil; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.ChannelHandler; From 8d3e5e6e5da2704e6caeec1cd0abfbc083fce8d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E8=BF=9C=E5=BE=81hd?= Date: Thu, 3 Jun 2021 16:51:22 +0800 Subject: [PATCH 3/3] modify comment, remove issue-id --- .../common/threadpool/manager/DefaultExecutorRepository.java | 4 ++-- .../org/apache/dubbo/remoting/transport/AbstractClient.java | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java index ce88b4e210c..e104283a745 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java @@ -97,7 +97,7 @@ public DefaultExecutorRepository() { */ public synchronized ExecutorService createExecutorIfAbsent(URL url) { Map executors = data.computeIfAbsent(EXECUTOR_SERVICE_COMPONENT_KEY, k -> new ConcurrentHashMap<>()); - //issue-7054:Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol. + // Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol. Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort(); ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url)); // If executor has been shut down, create a new one @@ -122,7 +122,7 @@ public ExecutorService getExecutor(URL url) { return null; } - //issue-7054:Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol. + // Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol. Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort(); ExecutorService executor = executors.get(portKey); if (executor != null && (executor.isShutdown() || executor.isTerminated())) { diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java index f263fd021bf..bd8c0e9a971 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java @@ -48,7 +48,6 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class); private final Lock connectLock = new ReentrantLock(); private final boolean needReconnect; - //issue-7054:Consumer's executor is sharing globally. protected volatile ExecutorService executor; private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension(); @@ -91,7 +90,7 @@ public AbstractClient(URL url, ChannelHandler handler) throws RemotingException } private void initExecutor(URL url) { - //issue-7054:Consumer's executor is sharing globally, thread name not require provider ip. + // Consumer's executor is sharing globally, thread name not require provider ip. url = url.addParameter(THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME); url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL); executor = executorRepository.createExecutorIfAbsent(url);