Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dubbo-7054]消费者端Executor改为全局共享,不再按提供者Port共享。 #7109

Merged
merged 8 commits into from
Feb 22, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.ThreadPool;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;

import java.util.Map;
Expand Down Expand Up @@ -71,12 +72,9 @@ public DefaultExecutorRepository() {
* @return
*/
public synchronized ExecutorService createExecutorIfAbsent(URL url) {
String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
Map<Integer, ExecutorService> executors = data.computeIfAbsent(componentKey, k -> new ConcurrentHashMap<>());
Integer portKey = url.getPort();
Map<Integer, ExecutorService> executors = data.computeIfAbsent(EXECUTOR_SERVICE_COMPONENT_KEY, k -> new ConcurrentHashMap<>());
//issue-7054:Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol.
Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort();
ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url));
// If executor has been shut down, create a new one
if (executor.isShutdown() || executor.isTerminated()) {
Expand All @@ -88,12 +86,7 @@ public synchronized ExecutorService createExecutorIfAbsent(URL url) {
}

public ExecutorService getExecutor(URL url) {
String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
Map<Integer, ExecutorService> executors = data.get(componentKey);

Map<Integer, ExecutorService> executors = data.get(EXECUTOR_SERVICE_COMPONENT_KEY);
/**
* It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already
* have Executor instances generated and stored.
Expand All @@ -103,17 +96,20 @@ public ExecutorService getExecutor(URL url) {
"before coming to here.");
return null;
}

Integer portKey = url.getPort();
//issue-7054:Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol.
Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort();
ExecutorService executor = executors.get(portKey);
if (executor != null) {
if (executor.isShutdown() || executor.isTerminated()) {
executors.remove(portKey);
executor = createExecutor(url);
executors.put(portKey, executor);
}
if (executor != null && (executor.isShutdown() || executor.isTerminated())) {
executors.remove(portKey);
// Does not re-create a shutdown executor, use SHARED_EXECUTOR for downgrade.
executor = null;
logger.info("Executor for " + url + " is shutdown.");
}
if (executor == null) {
return SHARED_EXECUTOR;
} else {
return executor;
}
return executor;
}

@Override
Expand Down Expand Up @@ -159,6 +155,19 @@ public ExecutorService getSharedExecutor() {
return SHARED_EXECUTOR;
}

@Override
public void destroyAll() {
data.values().forEach(executors -> {
if (executors != null) {
executors.values().forEach(executor -> {
if (executor != null && !executor.isShutdown()) {
ExecutorUtil.shutdownNow(executor, 100);
}
});
}
});
}

private ExecutorService createExecutor(URL url) {
return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,8 @@ public interface ExecutorRepository {
*/
ExecutorService getSharedExecutor();

/**
* Destroy all executors that are not in shutdown state
*/
void destroyAll();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ public void testGetExecutor() {
}

private void testGet(URL url) {
Assertions.assertNull(executorRepository.getExecutor(url));

ExecutorService executorService = executorRepository.createExecutorIfAbsent(url);
executorService.shutdown();
executorService = executorRepository.createExecutorIfAbsent(url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1257,7 +1257,7 @@ public void destroy() {
destroyRegistries();
DubboShutdownHook.destroyProtocols();
destroyServiceDiscoveries();

destroyExecutorRepository();
clear();
shutdown();
release();
Expand All @@ -1268,6 +1268,10 @@ public void destroy() {
}
}

private void destroyExecutorRepository() {
ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().destroyAll();
}

private void destroyRegistries() {
AbstractRegistryFactory.destroyAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
Expand All @@ -38,6 +37,7 @@

import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;

/**
* AbstractClient
Expand All @@ -48,6 +48,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
private final Lock connectLock = new ReentrantLock();
private final boolean needReconnect;
//issue-7054:Consumer's executor is sharing globally.
protected volatile ExecutorService executor;
private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();

Expand Down Expand Up @@ -90,7 +91,8 @@ public AbstractClient(URL url, ChannelHandler handler) throws RemotingException
}

private void initExecutor(URL url) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
//issue-7054:Consumer's executor is sharing globally, thread name not require provider ip.
url = url.addParameter(THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
executor = executorRepository.createExecutorIfAbsent(url);
}
Expand Down Expand Up @@ -277,14 +279,6 @@ public void close() {
logger.warn(e.getMessage(), e);
}

try {
if (executor != null) {
ExecutorUtil.shutdownNow(executor, 100);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}

try {
disconnect();
} catch (Throwable e) {
Expand All @@ -304,7 +298,6 @@ public void close() {

@Override
public void close(int timeout) {
ExecutorUtil.gracefulShutdown(executor, timeout);
close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class ThreadNameTest {
private ThreadNameVerifyHandler clientHandler;

private static String serverRegex = "DubboServerHandler\\-localhost:(\\d+)\\-thread\\-(\\d+)";
private static String clientRegex = "DubboClientHandler\\-localhost:(\\d+)\\-thread\\-(\\d+)";
private static String clientRegex = "DubboClientHandler\\-thread\\-(\\d+)";

@BeforeEach
public void before() throws Exception {
Expand Down