Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dubbo cloud native #4797

Merged
merged 39 commits into from
Aug 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
f51b394
Polish apache/dubbo#4542 : [Enhancement] Adapt the Java standard Even…
mercyblitz Jul 11, 2019
9e58caa
Polish apache/dubbo#4541 : [Feature] Add local File System DynamicCon…
mercyblitz Jul 15, 2019
9252be1
Polish apache#4541 : Bugfix
mercyblitz Jul 15, 2019
08089b6
Polish apache/dubbo#4541 : Optimization
mercyblitz Jul 15, 2019
cdd9338
Polish apache/dubbo#4541 : Add the compatibility for PollingWatchServ…
mercyblitz Jul 16, 2019
f0408f7
Polish apache/dubbo#4541 : Add delay publish without ThreadPoolExecutor
mercyblitz Jul 16, 2019
5f354a0
Polish apache/dubbo#4541 : Refactor the extension name
mercyblitz Jul 17, 2019
8152a75
Polish apache/dubbo#4541 : Add remove ops
mercyblitz Jul 17, 2019
df1a258
Polish apache/dubbo#4541 : Add testable constructor
mercyblitz Jul 17, 2019
52c0205
Polish apache/dubbo#4541 : Add getConfigGroups method
mercyblitz Jul 17, 2019
293a70d
Polish apache/dubbo#4610 : [Refactor] Refactor the bootstrap module
mercyblitz Jul 20, 2019
3218a1f
Polish apache/dubbo#4541 : Fix the nulling URL issue
mercyblitz Jul 20, 2019
d5c78ac
Polish apache/dubbo#4622 : [Refactor] Refactor ConfigManager
mercyblitz Jul 25, 2019
1d54a1d
Polish apache/dubbo#4622 : [Refactor] Refactor ConfigManager
mercyblitz Jul 25, 2019
5071a04
Polish apache/dubbo#4622 : Support multiple configcenters
mercyblitz Jul 26, 2019
ccf614c
Polish apache/dubbo#4671 : ServiceNameMapping will not map the group,…
mercyblitz Jul 26, 2019
87e88e2
update referenceCount log (#4683)
haiyang1985 Jul 29, 2019
5d299af
Polish /apache/dubbo#4687 : Remove the duplicated test code in dubbo-…
mercyblitz Jul 29, 2019
25ec31e
#4685 修改代码if判断false问题 if (hasException == false)修改成if (!hasException…
smipo Jul 30, 2019
c3132b8
Fixed Service annotation method parameters are not in effect (#4598)
Leishunyu Jul 31, 2019
f490df8
keep demo simple, and switch to use zookeeper as registry center (#4705)
beiwei30 Jul 31, 2019
c8dbd7d
@Reference auto-wires the instance of generic interface #4594 (#4677)
CodingSinger Jul 31, 2019
65a0140
try to shorten maven output to make travis build pass (#4710)
beiwei30 Aug 1, 2019
5b38331
use CountDownLatch to check zk registry if establish connection (#4589)
tswstarplanet Aug 1, 2019
a646874
Minor change
mercyblitz Aug 1, 2019
b8e601a
Merge remote-tracking branch 'upstream/master' into dubbo-cloud-native
mercyblitz Aug 2, 2019
0644fe1
Rename the extension name of WritableMetadataService
mercyblitz Aug 5, 2019
9f405d7
Polish apache/dubbo#4759 : [Refactor] Change the signature of methods…
mercyblitz Aug 7, 2019
a344121
Merge remote-tracking branch 'upstream/master' into dubbo-cloud-native
mercyblitz Aug 7, 2019
8c7f37a
Polish apache/dubbo#3984 : Add the implementation of Page<ServiceInst…
mercyblitz Aug 7, 2019
2a88323
Code merge
mercyblitz Aug 7, 2019
240c456
Code merge
mercyblitz Aug 8, 2019
391d053
Fix the cases
mercyblitz Aug 8, 2019
3e57a0a
Merge remote-tracking branch 'upstream/cloud-native' into dubbo-cloud…
mercyblitz Aug 8, 2019
83fc425
Merge remote-tracking branch 'upstream/cloud-native' into dubbo-cloud…
mercyblitz Aug 8, 2019
a147829
Refactor ConfigManager
mercyblitz Aug 12, 2019
eea35ce
Merge remote-tracking branch 'upstream/cloud-native' into dubbo-cloud…
mercyblitz Aug 12, 2019
e307e98
Refactor ConfigManager
mercyblitz Aug 12, 2019
f84a33d
Merge remote-tracking branch 'upstream/cloud-native' into dubbo-cloud…
mercyblitz Aug 12, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.Executors.newFixedThreadPool;

/**
* The abstract implementation of {@link DynamicConfiguration}
*
Expand All @@ -39,11 +38,21 @@ public abstract class AbstractDynamicConfiguration implements DynamicConfigurati

public static final String THREAD_POOL_PREFIX_PARAM_NAME = PARAM_NAME_PREFIX + "thread-pool.prefix";

public static final String DEFAULT_THREAD_POOL_PREFIX = PARAM_NAME_PREFIX + "workers";

public static final String THREAD_POOL_SIZE_PARAM_NAME = PARAM_NAME_PREFIX + "thread-pool.size";

public static final String DEFAULT_THREAD_POOL_PREFIX = PARAM_NAME_PREFIX + "workers";
/**
* The keep alive time in milliseconds for threads in {@link ThreadPoolExecutor}
*/
public static final String THREAD_POOL_KEEP_ALIVE_TIME_PARAM_NAME = PARAM_NAME_PREFIX + "thread-pool.keep-alive-time";

public static final int DEFAULT_THREAD_POOL_SIZE = 1;

public static final String DEFAULT_THREAD_POOL_SIZE = "1";
/**
* Default keep alive time in milliseconds for threads in {@link ThreadPoolExecutor} is 1 minute( 60 * 1000 ms)
*/
public static final long DEFAULT_THREAD_POOL_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);

/**
* Logger
Expand All @@ -55,8 +64,18 @@ public abstract class AbstractDynamicConfiguration implements DynamicConfigurati
*/
private final ThreadPoolExecutor workersThreadPool;

public AbstractDynamicConfiguration() {
this(DEFAULT_THREAD_POOL_PREFIX, DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_KEEP_ALIVE_TIME);
}

public AbstractDynamicConfiguration(URL url) {
this.workersThreadPool = initWorkersThreadPool(url);
this(getThreadPoolPrefixName(url), getThreadPoolSize(url), getThreadPoolKeepAliveTime(url));
}

public AbstractDynamicConfiguration(String threadPoolPrefixName,
int threadPoolSize,
long keepAliveTime) {
this.workersThreadPool = initWorkersThreadPool(threadPoolPrefixName, threadPoolSize, keepAliveTime);
}

@Override
Expand Down Expand Up @@ -94,9 +113,9 @@ public final void close() throws Exception {
* @param key the key
* @param group the group
* @return if found, return the content of configuration
* @throws IllegalStateException
* @throws Exception If met with some problems
*/
protected abstract String doGetConfig(String key, String group) throws IllegalStateException;
protected abstract String doGetConfig(String key, String group) throws Exception;

/**
* Close the resources if necessary
Expand Down Expand Up @@ -144,6 +163,10 @@ protected final <V> V execute(Callable<V> task, long timeout) {
return value;
}

protected ThreadPoolExecutor getWorkersThreadPool() {
return workersThreadPool;
}

private void doFinally() {
shutdownWorkersThreadPool();
}
Expand All @@ -154,18 +177,23 @@ private void shutdownWorkersThreadPool() {
}
}

protected ThreadPoolExecutor initWorkersThreadPool(URL url) {
int size = getThreadPoolSize(url);
String name = getThreadPoolPrefixName(url);
return (ThreadPoolExecutor) newFixedThreadPool(size, new NamedThreadFactory(name));
protected ThreadPoolExecutor initWorkersThreadPool(String threadPoolPrefixName,
int threadPoolSize,
long keepAliveTime) {
return new ThreadPoolExecutor(threadPoolSize, threadPoolSize, keepAliveTime,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(threadPoolPrefixName));
}

protected static String getThreadPoolPrefixName(URL url) {
return getParameter(url, THREAD_POOL_PREFIX_PARAM_NAME, DEFAULT_THREAD_POOL_PREFIX);
}

protected static int getThreadPoolSize(URL url) {
return Integer.parseInt(getParameter(url, THREAD_POOL_SIZE_PARAM_NAME, DEFAULT_THREAD_POOL_SIZE));
return url.getParameter(THREAD_POOL_SIZE_PARAM_NAME, DEFAULT_THREAD_POOL_SIZE);
}

protected static long getThreadPoolKeepAliveTime(URL url) {
return url.getParameter(THREAD_POOL_KEEP_ALIVE_TIME_PARAM_NAME, DEFAULT_THREAD_POOL_KEEP_ALIVE_TIME);
}

protected static String getParameter(URL url, String name, String defaultValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@
* <li>{@link #addListener(String, String, ConfigurationListener)}/ {@link #removeListener(String, String, ConfigurationListener)}
* , add or remove listeners for governance rules or config items that need to watch.</li>
* <li>{@link #getProperty(String, Object)}, get a single config item.</li>
* <li>{@link #getConfig(String, String, long)}, get the specified config</li>
* </ol>
*
* @see AbstractDynamicConfiguration
*/
public interface DynamicConfiguration extends Configuration, AutoCloseable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.dubbo.common.config.configcenter.file;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.configcenter.AbstractDynamicConfiguration;
import org.apache.dubbo.common.config.configcenter.ConfigChangeEvent;
import org.apache.dubbo.common.config.configcenter.ConfigChangeType;
import org.apache.dubbo.common.config.configcenter.ConfigurationListener;
Expand Down Expand Up @@ -51,10 +52,8 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -66,7 +65,6 @@
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import static java.util.Collections.emptySet;
import static java.util.Collections.unmodifiableMap;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.io.FileUtils.readFileToString;
Expand All @@ -77,23 +75,15 @@
*
* @since 2.7.4
*/
public class FileSystemDynamicConfiguration implements DynamicConfiguration {

public static final String PARAM_NAME_PREFIX = "dubbo.config-center.";
public class FileSystemDynamicConfiguration extends AbstractDynamicConfiguration {

public static final String CONFIG_CENTER_DIR_PARAM_NAME = PARAM_NAME_PREFIX + "dir";

public static final String THREAD_POOL_PREFIX_PARAM_NAME = PARAM_NAME_PREFIX + "thread-pool.prefix";

public static final String THREAD_POOL_SIZE_PARAM_NAME = PARAM_NAME_PREFIX + "thread-pool.size";

public static final String CONFIG_CENTER_ENCODING_PARAM_NAME = PARAM_NAME_PREFIX + "encoding";

public static final String DEFAULT_CONFIG_CENTER_DIR_PATH = System.getProperty("user.home") + File.separator
+ ".dubbo" + File.separator + "config-center";

public static final String DEFAULT_THREAD_POOL_PREFIX = PARAM_NAME_PREFIX + "workers";

public static final String DEFAULT_THREAD_POOL_SIZE = "1";

public static final String DEFAULT_CONFIG_CENTER_ENCODING = "UTF-8";
Expand All @@ -112,6 +102,7 @@ public class FileSystemDynamicConfiguration implements DynamicConfiguration {
*/
private static final Log logger = LogFactory.getLog(FileSystemDynamicConfiguration.class);


/**
* The unmodifiable map for {@link ConfigChangeType} whose key is the {@link WatchEvent.Kind#name() name} of
* {@link WatchEvent.Kind WatchEvent's Kind}
Expand Down Expand Up @@ -166,11 +157,6 @@ public class FileSystemDynamicConfiguration implements DynamicConfiguration {

private final String encoding;

/**
* The thread pool for workers who executes the tasks
*/
private final ThreadPoolExecutor workersThreadPool;

/**
* The {@link Set} of {@link #configDirectory(String) directories} that may be processing,
* <p>
Expand All @@ -184,16 +170,17 @@ public class FileSystemDynamicConfiguration implements DynamicConfiguration {
private final Map<File, List<ConfigurationListener>> listenersRepository;

public FileSystemDynamicConfiguration(URL url) {
this(initDirectory(url), getEncoding(url), getThreadPoolPrefixName(url), getThreadPoolSize(url));
this(initDirectory(url), getEncoding(url), getThreadPoolPrefixName(url), getThreadPoolSize(url),
getThreadPoolKeepAliveTime(url));
}

public FileSystemDynamicConfiguration(File rootDirectory, String encoding,
String threadPoolPrefixName,
int threadPoolSize
) {
int threadPoolSize,
long keepAliveTime) {
super(threadPoolPrefixName, threadPoolSize, keepAliveTime);
this.rootDirectory = rootDirectory;
this.encoding = encoding;
this.workersThreadPool = initWorkersThreadPool(threadPoolPrefixName, threadPoolSize);
this.processingDirectories = initProcessingDirectories();
this.listenersRepository = new LinkedHashMap<>();
}
Expand Down Expand Up @@ -231,12 +218,6 @@ public void removeListener(String key, String group, ConfigurationListener liste
});
}

@Override
public String getConfig(String key, String group, long timeout) throws IllegalStateException {
File configFile = configFile(key, group);
return getConfig(configFile, timeout);
}

protected File configDirectory(String group) {
String actualGroup = isBlank(group) ? DEFAULT_GROUP : group;
return new File(rootDirectory, actualGroup);
Expand Down Expand Up @@ -335,7 +316,7 @@ private List<ConfigurationListener> getListeners(File configFile) {

private void fireConfigChangeEvent(File configFile, ConfigChangeType configChangeType) {
String key = configFile.getName();
String value = getConfig(configFile, -1L);
String value = getConfig(configFile);
// fire ConfigChangeEvent one by one
getListeners(configFile).forEach(listener -> {
try {
Expand All @@ -348,10 +329,6 @@ private void fireConfigChangeEvent(File configFile, ConfigChangeType configChang
});
}

protected String getConfig(File configFile, long timeout) {
return canRead(configFile) ? execute(() -> readFileToString(configFile, getEncoding()), timeout) : null;
}

private boolean canRead(File file) {
return file.exists() && file.canRead();
}
Expand All @@ -373,18 +350,14 @@ public boolean publishConfig(String key, String group, String content) {
public String removeConfig(String key, String group) {
return delay(key, group, configFile -> {

String content = getConfig(configFile, -1L);
String content = getConfig(configFile);

FileUtils.deleteQuietly(configFile);

return content;
});
}

private ThreadPoolExecutor initWorkersThreadPool(String prefix, int size) {
return (ThreadPoolExecutor) newFixedThreadPool(size, new NamedThreadFactory(prefix));
}

/**
* Delay action for {@link #configFile(String, String) config file}
*
Expand Down Expand Up @@ -470,36 +443,25 @@ public SortedMap<String, String> getConfigs(String group) throws UnsupportedOper
}

@Override
public void close() throws Exception {
// TODO
protected String doGetConfig(String key, String group) throws Exception {
File configFile = configFile(key, group);
return getConfig(configFile);
}

private <V> V execute(Callable<V> task, long timeout) {
V value = null;
try {
protected String getConfig(File configFile) {
return ThrowableFunction.execute(configFile,
file -> canRead(configFile) ? readFileToString(configFile, getEncoding()) : null);
}

@Override
protected void doClose() throws Exception {

if (timeout < 1) { // less or equal 0
value = task.call();
} else {
Future<V> future = workersThreadPool.submit(task);
value = future.get(timeout, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
if (logger.isErrorEnabled()) {
logger.error(e.getMessage(), e);
}
}
return value;
}

protected File getRootDirectory() {
return rootDirectory;
}

protected ThreadPoolExecutor getWorkersThreadPool() {
return workersThreadPool;
}

protected String getEncoding() {
return encoding;
}
Expand All @@ -520,14 +482,14 @@ protected static boolean isBasedPoolingWatchService() {
return basedPoolingWatchService;
}

private static String getThreadPoolPrefixName(URL url) {
return getParameter(url, THREAD_POOL_PREFIX_PARAM_NAME, DEFAULT_THREAD_POOL_PREFIX);
}

protected static ThreadPoolExecutor getWatchEventsLoopThreadPool() {
return watchEventsLoopThreadPool;
}

protected ThreadPoolExecutor getWorkersThreadPool() {
return super.getWorkersThreadPool();
}

private <V> V executeMutually(Object mutex, Callable<V> callable) {
V value = null;
synchronized (mutex) {
Expand Down Expand Up @@ -592,7 +554,7 @@ private static Optional<WatchService> newWatchService() {
return watchService;
}

private static File initDirectory(URL url) {
protected static File initDirectory(URL url) {
String directoryPath = getParameter(url, CONFIG_CENTER_DIR_PARAM_NAME, DEFAULT_CONFIG_CENTER_DIR_PATH);
File rootDirectory = new File(getParameter(url, CONFIG_CENTER_DIR_PARAM_NAME, DEFAULT_CONFIG_CENTER_DIR_PATH));
if (!rootDirectory.exists() && !rootDirectory.mkdirs()) {
Expand All @@ -602,21 +564,10 @@ private static File initDirectory(URL url) {
return rootDirectory;
}

private static String getParameter(URL url, String name, String defaultValue) {
if (url != null) {
return url.getParameter(name, defaultValue);
}
return defaultValue;
}

private static String getEncoding(URL url) {
protected static String getEncoding(URL url) {
return getParameter(url, CONFIG_CENTER_ENCODING_PARAM_NAME, DEFAULT_CONFIG_CENTER_ENCODING);
}

private static int getThreadPoolSize(URL url) {
return Integer.parseInt(getParameter(url, THREAD_POOL_SIZE_PARAM_NAME, DEFAULT_THREAD_POOL_SIZE));
}

private static ThreadPoolExecutor newWatchEventsLoopThreadPool() {
return new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE,
0L, MILLISECONDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import org.apache.dubbo.common.utils.ReflectUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.context.ConfigConfigurationAdapter;
import org.apache.dubbo.config.context.ConfigManager;
import org.apache.dubbo.config.support.Parameter;
import org.apache.dubbo.rpc.model.ConsumerMethodModel;

import javax.annotation.PostConstruct;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
Expand Down Expand Up @@ -646,4 +648,17 @@ public boolean equals(Object obj) {
}
return true;
}

/**
* Add {@link AbstractConfig current instance} into {@link ConfigManager}
* <p>
* Current method will invoked by Spring or Java EE container automatically, or should be triggered manually.
*
* @see ConfigManager#addConfig(AbstractConfig)
* @since 2.7.4
*/
@PostConstruct
public void addIntoConfigManager() {
ConfigManager.getInstance().addConfig(this);
}
}
Loading