Skip to content

Commit

Permalink
Merge branch 'optmise-executor-repository' into '2.7.7-ctrip'
Browse files Browse the repository at this point in the history
消费者端Executor改为全局共享,不再按提供者Port共享。 (apache#7109)

See merge request framework/dubbo!66
  • Loading branch information
vio-lin committed Jun 27, 2023
2 parents 9ab313a + 2cfbf10 commit 8be898e
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.ThreadPool;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;

import java.util.Map;
Expand Down Expand Up @@ -71,12 +72,9 @@ public DefaultExecutorRepository() {
* @return
*/
public synchronized ExecutorService createExecutorIfAbsent(URL url) {
String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
Map<Integer, ExecutorService> executors = data.computeIfAbsent(componentKey, k -> new ConcurrentHashMap<>());
Integer portKey = url.getPort();
Map<Integer, ExecutorService> executors = data.computeIfAbsent(EXECUTOR_SERVICE_COMPONENT_KEY, k -> new ConcurrentHashMap<>());
//issue-7054:Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol.
Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort();
ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url));
// If executor has been shut down, create a new one
if (executor.isShutdown() || executor.isTerminated()) {
Expand All @@ -88,12 +86,7 @@ public synchronized ExecutorService createExecutorIfAbsent(URL url) {
}

public ExecutorService getExecutor(URL url) {
String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
Map<Integer, ExecutorService> executors = data.get(componentKey);

Map<Integer, ExecutorService> executors = data.get(EXECUTOR_SERVICE_COMPONENT_KEY);
/**
* It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already
* have Executor instances generated and stored.
Expand All @@ -103,17 +96,20 @@ public ExecutorService getExecutor(URL url) {
"before coming to here.");
return null;
}

Integer portKey = url.getPort();
//issue-7054:Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol.
Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort();
ExecutorService executor = executors.get(portKey);
if (executor != null) {
if (executor.isShutdown() || executor.isTerminated()) {
executors.remove(portKey);
executor = createExecutor(url);
executors.put(portKey, executor);
}
if (executor != null && (executor.isShutdown() || executor.isTerminated())) {
executors.remove(portKey);
// Does not re-create a shutdown executor, use SHARED_EXECUTOR for downgrade.
executor = null;
logger.info("Executor for " + url + " is shutdown.");
}
if (executor == null) {
return SHARED_EXECUTOR;
} else {
return executor;
}
return executor;
}

@Override
Expand Down Expand Up @@ -159,6 +155,19 @@ public ExecutorService getSharedExecutor() {
return SHARED_EXECUTOR;
}

@Override
public void destroyAll() {
data.values().forEach(executors -> {
if (executors != null) {
executors.values().forEach(executor -> {
if (executor != null && !executor.isShutdown()) {
ExecutorUtil.shutdownNow(executor, 100);
}
});
}
});
}

private ExecutorService createExecutor(URL url) {
return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,8 @@ public interface ExecutorRepository {
*/
ExecutorService getSharedExecutor();

/**
* Destroy all executors that are not in shutdown state
*/
void destroyAll();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.threadpool.manager;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

public class ExecutorRepositoryTest {
private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();

@Test
public void testGetExecutor() {
testGet(URL.valueOf("dubbo://127.0.0.1:23456"));
testGet(URL.valueOf("dubbo://127.0.0.1:23456?side=consumer"));

Assertions.assertNotNull(executorRepository.getSharedExecutor());
Assertions.assertNotNull(executorRepository.getServiceExporterExecutor());
executorRepository.nextScheduledExecutor();
}

private void testGet(URL url) {
ExecutorService executorService = executorRepository.createExecutorIfAbsent(url);
executorService.shutdown();
executorService = executorRepository.createExecutorIfAbsent(url);
Assertions.assertFalse(executorService.isShutdown());

Assertions.assertEquals(executorService, executorRepository.getExecutor(url));
executorService.shutdown();
Assertions.assertNotEquals(executorService, executorRepository.getExecutor(url));
}

@Test
public void testUpdateExecutor() {
URL url = URL.valueOf("dubbo://127.0.0.1:23456?threads=5");
ThreadPoolExecutor executorService = (ThreadPoolExecutor) executorRepository.createExecutorIfAbsent(url);

executorService.setCorePoolSize(3);
executorRepository.updateThreadpool(url, executorService);

executorService.setCorePoolSize(3);
executorService.setMaximumPoolSize(3);
executorRepository.updateThreadpool(url, executorService);

executorService.setMaximumPoolSize(20);
executorService.setCorePoolSize(10);
executorRepository.updateThreadpool(url, executorService);

executorService.setCorePoolSize(10);
executorService.setMaximumPoolSize(10);
executorRepository.updateThreadpool(url, executorService);

executorService.setCorePoolSize(5);
executorRepository.updateThreadpool(url, executorService);


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,7 @@ public void destroy() {
destroyRegistries();
DubboShutdownHook.destroyProtocols();
destroyServiceDiscoveries();

destroyExecutorRepository();
clear();
shutdown();
release();
Expand All @@ -1090,6 +1090,10 @@ public void destroy() {
}
}

private void destroyExecutorRepository() {
ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().destroyAll();
}

private void destroyRegistries() {
AbstractRegistryFactory.destroyAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
Expand All @@ -38,6 +37,7 @@

import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;

/**
* AbstractClient
Expand All @@ -48,6 +48,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
private final Lock connectLock = new ReentrantLock();
private final boolean needReconnect;
//issue-7054:Consumer's executor is sharing globally.
protected volatile ExecutorService executor;
private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();

Expand Down Expand Up @@ -89,7 +90,8 @@ public AbstractClient(URL url, ChannelHandler handler) throws RemotingException
}

private void initExecutor(URL url) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
//issue-7054:Consumer's executor is sharing globally, thread name not require provider ip.
url = url.addParameter(THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
executor = executorRepository.createExecutorIfAbsent(url);
}
Expand Down Expand Up @@ -198,8 +200,8 @@ protected void connect() throws RemotingException {
} else {
if (logger.isInfoEnabled()) {
logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
}

Expand All @@ -208,8 +210,8 @@ protected void connect() throws RemotingException {

} catch (Throwable e) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);

} finally {
connectLock.unlock();
Expand Down Expand Up @@ -261,14 +263,6 @@ public void close() {
logger.warn(e.getMessage(), e);
}

try {
if (executor != null) {
ExecutorUtil.shutdownNow(executor, 100);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}

try {
disconnect();
} catch (Throwable e) {
Expand All @@ -284,7 +278,6 @@ public void close() {

@Override
public void close(int timeout) {
ExecutorUtil.gracefulShutdown(executor, timeout);
close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class ThreadNameTest {
private ThreadNameVerifyHandler clientHandler;

private static String serverRegex = "DubboServerHandler\\-localhost:(\\d+)\\-thread\\-(\\d+)";
private static String clientRegex = "DubboClientHandler\\-localhost:(\\d+)\\-thread\\-(\\d+)";
private static String clientRegex = "DubboClientHandler\\-thread\\-(\\d+)";

@BeforeEach
public void before() throws Exception {
Expand Down

0 comments on commit 8be898e

Please sign in to comment.