Skip to content

Commit

Permalink
消费者端Executor改为全局共享,不再按提供者Port共享。 (apache#7109)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyz-hd authored Feb 22, 2021
1 parent f75c22c commit 2dadaf8
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.ThreadPool;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;

import java.util.Map;
Expand Down Expand Up @@ -71,12 +72,9 @@ public DefaultExecutorRepository() {
* @return
*/
public synchronized ExecutorService createExecutorIfAbsent(URL url) {
String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
Map<Integer, ExecutorService> executors = data.computeIfAbsent(componentKey, k -> new ConcurrentHashMap<>());
Integer portKey = url.getPort();
Map<Integer, ExecutorService> executors = data.computeIfAbsent(EXECUTOR_SERVICE_COMPONENT_KEY, k -> new ConcurrentHashMap<>());
//issue-7054:Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol.
Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort();
ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url));
// If executor has been shut down, create a new one
if (executor.isShutdown() || executor.isTerminated()) {
Expand All @@ -88,12 +86,7 @@ public synchronized ExecutorService createExecutorIfAbsent(URL url) {
}

public ExecutorService getExecutor(URL url) {
String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
Map<Integer, ExecutorService> executors = data.get(componentKey);

Map<Integer, ExecutorService> executors = data.get(EXECUTOR_SERVICE_COMPONENT_KEY);
/**
* It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already
* have Executor instances generated and stored.
Expand All @@ -103,17 +96,20 @@ public ExecutorService getExecutor(URL url) {
"before coming to here.");
return null;
}

Integer portKey = url.getPort();
//issue-7054:Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol.
Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort();
ExecutorService executor = executors.get(portKey);
if (executor != null) {
if (executor.isShutdown() || executor.isTerminated()) {
executors.remove(portKey);
executor = createExecutor(url);
executors.put(portKey, executor);
}
if (executor != null && (executor.isShutdown() || executor.isTerminated())) {
executors.remove(portKey);
// Does not re-create a shutdown executor, use SHARED_EXECUTOR for downgrade.
executor = null;
logger.info("Executor for " + url + " is shutdown.");
}
if (executor == null) {
return SHARED_EXECUTOR;
} else {
return executor;
}
return executor;
}

@Override
Expand Down Expand Up @@ -159,6 +155,19 @@ public ExecutorService getSharedExecutor() {
return SHARED_EXECUTOR;
}

@Override
public void destroyAll() {
data.values().forEach(executors -> {
if (executors != null) {
executors.values().forEach(executor -> {
if (executor != null && !executor.isShutdown()) {
ExecutorUtil.shutdownNow(executor, 100);
}
});
}
});
}

private ExecutorService createExecutor(URL url) {
return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,8 @@ public interface ExecutorRepository {
*/
ExecutorService getSharedExecutor();

/**
* Destroy all executors that are not in shutdown state
*/
void destroyAll();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ public void testGetExecutor() {
}

private void testGet(URL url) {
Assertions.assertNull(executorRepository.getExecutor(url));

ExecutorService executorService = executorRepository.createExecutorIfAbsent(url);
executorService.shutdown();
executorService = executorRepository.createExecutorIfAbsent(url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1269,7 +1269,7 @@ public void destroy() {
destroyRegistries();
DubboShutdownHook.destroyProtocols();
destroyServiceDiscoveries();

destroyExecutorRepository();
clear();
shutdown();
release();
Expand All @@ -1282,6 +1282,10 @@ public void destroy() {
}
}

private void destroyExecutorRepository() {
ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().destroyAll();
}

private void destroyRegistries() {
AbstractRegistryFactory.destroyAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
Expand All @@ -38,6 +37,7 @@

import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;

/**
* AbstractClient
Expand All @@ -48,6 +48,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
private final Lock connectLock = new ReentrantLock();
private final boolean needReconnect;
//issue-7054:Consumer's executor is sharing globally.
protected volatile ExecutorService executor;
private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();

Expand Down Expand Up @@ -90,7 +91,8 @@ public AbstractClient(URL url, ChannelHandler handler) throws RemotingException
}

private void initExecutor(URL url) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
//issue-7054:Consumer's executor is sharing globally, thread name not require provider ip.
url = url.addParameter(THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
executor = executorRepository.createExecutorIfAbsent(url);
}
Expand Down Expand Up @@ -277,14 +279,6 @@ public void close() {
logger.warn(e.getMessage(), e);
}

try {
if (executor != null) {
ExecutorUtil.shutdownNow(executor, 100);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}

try {
disconnect();
} catch (Throwable e) {
Expand All @@ -304,7 +298,6 @@ public void close() {

@Override
public void close(int timeout) {
ExecutorUtil.gracefulShutdown(executor, timeout);
close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class ThreadNameTest {
private ThreadNameVerifyHandler clientHandler;

private static String serverRegex = "DubboServerHandler\\-localhost:(\\d+)\\-thread\\-(\\d+)";
private static String clientRegex = "DubboClientHandler\\-localhost:(\\d+)\\-thread\\-(\\d+)";
private static String clientRegex = "DubboClientHandler\\-thread\\-(\\d+)";

@BeforeEach
public void before() throws Exception {
Expand Down

0 comments on commit 2dadaf8

Please sign in to comment.