Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dubbo-7054]消费者端Executor调整为“全局共享”、“按提供者隔离”2个可选模式 #7120

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -364,4 +364,8 @@ public interface CommonConstants {
String SENTINEL_REDIS = "sentinel";

String CLUSTER_REDIS = "cluster";

String SHARE_EXECUTOR_KEY = "share.threadpool";

String SHARED_CONSUMER_EXECUTOR = "consumer-shared-pool";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.threadpool;

import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public class ShareableExecutor extends AbstractExecutorService {

private ExecutorService executor;

public ShareableExecutor(ExecutorService executor) {
this.executor = executor;
}

@Override
public void shutdown() {
executor.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return executor.shutdownNow();
}

@Override
public boolean isShutdown() {
return executor.isShutdown();
}

@Override
public boolean isTerminated() {
return executor.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executor.awaitTermination(timeout, unit);
}

@Override
public void execute(Runnable command) {
executor.execute(command);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.ShareableExecutor;
import org.apache.dubbo.common.threadpool.ThreadPool;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;

import java.util.Map;
Expand All @@ -33,6 +35,8 @@

import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
import static org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.SHARED_CONSUMER_EXECUTOR;
import static org.apache.dubbo.common.constants.CommonConstants.SHARE_EXECUTOR_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;

Expand All @@ -52,7 +56,7 @@ public class DefaultExecutorRepository implements ExecutorRepository {

private ScheduledExecutorService reconnectScheduledExecutor;

private ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = new ConcurrentHashMap<>();
private ConcurrentMap<String, ConcurrentMap<String, ExecutorService>> data = new ConcurrentHashMap<>();

public DefaultExecutorRepository() {
for (int i = 0; i < DEFAULT_SCHEDULER_SIZE; i++) {
Expand All @@ -71,29 +75,20 @@ public DefaultExecutorRepository() {
* @return
*/
public synchronized ExecutorService createExecutorIfAbsent(URL url) {
String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
Map<Integer, ExecutorService> executors = data.computeIfAbsent(componentKey, k -> new ConcurrentHashMap<>());
Integer portKey = url.getPort();
ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url));
Map<String, ExecutorService> executors = data.computeIfAbsent(EXECUTOR_SERVICE_COMPONENT_KEY, k -> new ConcurrentHashMap<>());
String shareKey = getShareKey(url);
ExecutorService executor = executors.computeIfAbsent(shareKey, k -> createExecutor(url));
// If executor has been shut down, create a new one
if (executor.isShutdown() || executor.isTerminated()) {
executors.remove(portKey);
executors.remove(shareKey);
executor = createExecutor(url);
executors.put(portKey, executor);
executors.put(shareKey, executor);
}
return executor;
}

public ExecutorService getExecutor(URL url) {
String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
Map<Integer, ExecutorService> executors = data.get(componentKey);

Map<String, ExecutorService> executors = data.get(EXECUTOR_SERVICE_COMPONENT_KEY);
/**
* It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already
* have Executor instances generated and stored.
Expand All @@ -103,17 +98,34 @@ public ExecutorService getExecutor(URL url) {
"before coming to here.");
return null;
}

Integer portKey = url.getPort();
ExecutorService executor = executors.get(portKey);
if (executor != null) {
if (executor.isShutdown() || executor.isTerminated()) {
executors.remove(portKey);
executor = createExecutor(url);
executors.put(portKey, executor);
}
String shareKey = getShareKey(url);
ExecutorService executor = executors.get(shareKey);
if (executor != null && (executor.isShutdown() || executor.isTerminated())) {
executors.remove(shareKey);
// Does not re-create a shutdown executor, use SHARED_EXECUTOR for downgrade.
executor = null;
}
return executor;
if (executor == null) {
logger.warn("Executor of key + '" + shareKey + "' has shutdown, return 'DubboSharedHandler' instead.");
return SHARED_EXECUTOR;
} else {
return executor;
}
}

/**
* Consumer's executor is sharing globally when share.threadpool = true, or not shareable when share.threadpool =false.
* Provider's executor is sharing by protocol.
*
* @param url
* @return
*/
private String getShareKey(URL url) {
return isShared(url) ? SHARED_CONSUMER_EXECUTOR : url.getAddress();
}

private boolean isShared(URL url) {
return CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) && url.getParameter(SHARE_EXECUTOR_KEY, true);
}

@Override
Expand Down Expand Up @@ -159,8 +171,26 @@ public ExecutorService getSharedExecutor() {
return SHARED_EXECUTOR;
}

@Override
public void destroyAll() {
data.values().forEach(executors -> {
if (executors != null) {
executors.values().forEach(executor -> {
if (executor != null && !executor.isShutdown()) {
ExecutorUtil.shutdownNow(executor, 100);
}
});
}
});
}

private ExecutorService createExecutor(URL url) {
return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
ExecutorService executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
if (isShared(url)) {
return new ShareableExecutor(executor);
} else {
return executor;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,8 @@ public interface ExecutorRepository {
*/
ExecutorService getSharedExecutor();

/**
* Destroy all executors that are not in shutdown state
*/
void destroyAll();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ public void testGetExecutor() {
}

private void testGet(URL url) {
Assertions.assertNull(executorRepository.getExecutor(url));

ExecutorService executorService = executorRepository.createExecutorIfAbsent(url);
executorService.shutdown();
executorService = executorRepository.createExecutorIfAbsent(url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1257,7 +1257,7 @@ public void destroy() {
destroyRegistries();
DubboShutdownHook.destroyProtocols();
destroyServiceDiscoveries();

destroyExecutorRepository();
clear();
shutdown();
release();
Expand All @@ -1268,6 +1268,10 @@ public void destroy() {
}
}

private void destroyExecutorRepository() {
ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().destroyAll();
}

private void destroyRegistries() {
AbstractRegistryFactory.destroyAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.ShareableExecutor;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NetUtils;
Expand All @@ -37,7 +38,9 @@
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
import static org.apache.dubbo.common.constants.CommonConstants.SHARE_EXECUTOR_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;

/**
* AbstractClient
Expand All @@ -48,6 +51,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
private final Lock connectLock = new ReentrantLock();
private final boolean needReconnect;
//issue-7054:Consumer's executor is sharing globally.
protected volatile ExecutorService executor;
private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();

Expand Down Expand Up @@ -90,7 +94,12 @@ public AbstractClient(URL url, ChannelHandler handler) throws RemotingException
}

private void initExecutor(URL url) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
//issue-7054: if consumer's executor is sharing globally, thread name not require provider ip.
if (url.getParameter(SHARE_EXECUTOR_KEY, true)) {
url = url.addParameter(THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME);
} else {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
}
url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
executor = executorRepository.createExecutorIfAbsent(url);
}
Expand Down Expand Up @@ -278,7 +287,7 @@ public void close() {
}

try {
if (executor != null) {
if (executor != null && !(executor instanceof ShareableExecutor)) {
ExecutorUtil.shutdownNow(executor, 100);
}
} catch (Throwable e) {
Expand All @@ -304,7 +313,9 @@ public void close() {

@Override
public void close(int timeout) {
ExecutorUtil.gracefulShutdown(executor, timeout);
if (!(executor instanceof ShareableExecutor)) {
ExecutorUtil.gracefulShutdown(executor, timeout);
}
close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class ThreadNameTest {
private ThreadNameVerifyHandler clientHandler;

private static String serverRegex = "DubboServerHandler\\-localhost:(\\d+)\\-thread\\-(\\d+)";
private static String clientRegex = "DubboClientHandler\\-localhost:(\\d+)\\-thread\\-(\\d+)";
private static String clientRegex = "DubboClientHandler\\-thread\\-(\\d+)";

@BeforeEach
public void before() throws Exception {
Expand Down