Skip to content

Commit

Permalink
add new thread pool strategy for Consumer, allow all service calls to…
Browse files Browse the repository at this point in the history
… share the same thread pool. (#5891)
  • Loading branch information
chickenlj authored Mar 27, 2020
1 parent bada690 commit 585bb11
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,10 @@ public class Constants {

public static final String EXECUTOR_SERVICE_COMPONENT_KEY = ExecutorService.class.getName();

public static final String SHARE_EXECUTOR_KEY = "share.threadpool";

public static final String SHARED_CONSUMER_EXECUTOR_PORT = "consumer.executor.port";

public static final String GENERIC_SERIALIZATION_NATIVE_JAVA = "nativejava";

public static final String GENERIC_SERIALIZATION_DEFAULT = "true";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ public Object get(String componentName, String key) {

@Override
public void put(String componentName, String key, Object value) {
Map<String, Object> componentData = data.get(componentName);
ConcurrentMap<String, Object> componentData = data.get(componentName);
if (null == componentData) {
data.putIfAbsent(componentName, new ConcurrentHashMap<String, Object>());
componentData = data.get(componentName);
}
componentData.put(key, value);
componentData.putIfAbsent(key, value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static com.alibaba.dubbo.common.Constants.SHARED_CONSUMER_EXECUTOR_PORT;
import static com.alibaba.dubbo.common.Constants.SHARE_EXECUTOR_KEY;

/**
* AbstractClient
*/
Expand Down Expand Up @@ -105,10 +108,14 @@ public AbstractClient(URL url, ChannelHandler handler) throws RemotingException
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}

boolean shouldShareExecutor = url.getParameter(SHARE_EXECUTOR_KEY, false);
String portKey = shouldShareExecutor ? SHARED_CONSUMER_EXECUTOR_PORT : Integer.toString(url.getPort());
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
.getDefaultExtension().get(Constants.CONSUMER_SIDE, portKey);
if (!shouldShareExecutor) {
ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().remove(Constants.CONSUMER_SIDE, portKey);
}
}

protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static com.alibaba.dubbo.common.Constants.SHARED_CONSUMER_EXECUTOR_PORT;
import static com.alibaba.dubbo.common.Constants.SHARE_EXECUTOR_KEY;

public class WrappedChannelHandler implements ChannelHandlerDelegate {

protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class);
Expand All @@ -44,17 +47,32 @@ public class WrappedChannelHandler implements ChannelHandlerDelegate {

protected final URL url;

protected DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();

public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
String componentKey;
if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
componentKey = Constants.CONSUMER_SIDE;
if (url.getParameter(SHARE_EXECUTOR_KEY, false)) {
ExecutorService cExecutor = (ExecutorService) dataStore.get(componentKey, SHARED_CONSUMER_EXECUTOR_PORT);
if (cExecutor == null) {
cExecutor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
dataStore.put(componentKey, SHARED_CONSUMER_EXECUTOR_PORT, cExecutor);
cExecutor = (ExecutorService) dataStore.get(componentKey, SHARED_CONSUMER_EXECUTOR_PORT);
}
executor = cExecutor;
} else {
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
} else {
componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.transport.dispatcher.WrappedChannelHandler;

import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;

import java.lang.reflect.Field;
import java.util.concurrent.ThreadPoolExecutor;

import static org.junit.Assert.fail;

Expand Down Expand Up @@ -105,6 +105,30 @@ public void test_Caught_Biz_Error() throws RemotingException {
}
}

@Test
public void testShareConsumerExecutor() {
URL url1 = URL.valueOf("dubbo://10.20.30.40:1234/DemoService?side=consumer&share.threadpool=true&threadpool=fixed");
WrappedChannelHandler handler1 = new WrappedChannelHandler(new BizChannelHander(true), url1);
WrappedChannelHandler handler2 = new WrappedChannelHandler(new BizChannelHander(true), url1);
Assert.assertEquals(200, ((ThreadPoolExecutor) (handler1.getExecutor())).getMaximumPoolSize());
Assert.assertSame(handler1.getExecutor(), handler2.getExecutor());

URL url2 = URL.valueOf("dubbo://10.20.30.40:1234/DemoService?side=consumer&share.threadpool=false");
WrappedChannelHandler handler3 = new WrappedChannelHandler(new BizChannelHander(true), url2);
WrappedChannelHandler handler4 = new WrappedChannelHandler(new BizChannelHander(true), url2);
Assert.assertNotSame(handler3.getExecutor(), handler4.getExecutor());
Assert.assertNotSame(handler3.getExecutor(), handler1.getExecutor());
}

@Test
public void testProviderExecutor() {
URL url1 = URL.valueOf("dubbo://10.20.30.40:1234/DemoService?side=provider");
URL url2 = URL.valueOf("dubbo://10.20.30.40:6789/DemoService?side=provider");
WrappedChannelHandler handler1 = new WrappedChannelHandler(new BizChannelHander(true), url1);
WrappedChannelHandler handler2 = new WrappedChannelHandler(new BizChannelHander(true), url2);
Assert.assertNotSame(handler1.getExecutor(), handler2.getExecutor());
}

class BizChannelHander extends MockedChannelHandler {
private boolean invokeWithBizError;

Expand Down

0 comments on commit 585bb11

Please sign in to comment.