From a0a84222d71f6e16efb70805e9d355280e1aea8c Mon Sep 17 00:00:00 2001 From: cvictory Date: Fri, 25 Jan 2019 10:54:45 +0800 Subject: [PATCH 1/9] fix #2842. remove duplicate SPI definitions for 2.7.x --- dubbo-all/pom.xml | 12 ++++++++++++ dubbo-container/dubbo-container-api/pom.xml | 3 +-- pom.xml | 1 - 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/dubbo-all/pom.xml b/dubbo-all/pom.xml index 8c46af627db..15fd731d706 100644 --- a/dubbo-all/pom.xml +++ b/dubbo-all/pom.xml @@ -614,6 +614,18 @@ + + + org.apache.dubbo:dubbo + + + com/** + org/** + + META-INF/dubbo/** + + + diff --git a/dubbo-container/dubbo-container-api/pom.xml b/dubbo-container/dubbo-container-api/pom.xml index 2e1f544064f..2c1e9100af5 100644 --- a/dubbo-container/dubbo-container-api/pom.xml +++ b/dubbo-container/dubbo-container-api/pom.xml @@ -47,9 +47,8 @@ org.apache.dubbo.container.Main - true - \ No newline at end of file + diff --git a/pom.xml b/pom.xml index 4cbb67e5b46..8f1a39b7201 100644 --- a/pom.xml +++ b/pom.xml @@ -389,7 +389,6 @@ ${project.version} - true From 19824c3a9d436b5a19bb9cc211935ae5750d2314 Mon Sep 17 00:00:00 2001 From: cvictory Date: Thu, 21 Feb 2019 10:42:06 +0800 Subject: [PATCH 2/9] fix: rename the thread name from DubboRegistryFailedRetryTimer to DubboMetadataReportRetryTimer in MetadataReportRetry --- .../dubbo/metadata/support/AbstractMetadataReport.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dubbo-metadata-report/dubbo-metadata-report-api/src/main/java/org/apache/dubbo/metadata/support/AbstractMetadataReport.java b/dubbo-metadata-report/dubbo-metadata-report-api/src/main/java/org/apache/dubbo/metadata/support/AbstractMetadataReport.java index 6080fa58f82..95b44f44927 100644 --- a/dubbo-metadata-report/dubbo-metadata-report-api/src/main/java/org/apache/dubbo/metadata/support/AbstractMetadataReport.java +++ b/dubbo-metadata-report/dubbo-metadata-report-api/src/main/java/org/apache/dubbo/metadata/support/AbstractMetadataReport.java @@ -62,7 +62,7 @@ public abstract class AbstractMetadataReport implements MetadataReport { // Log output protected final Logger logger = LoggerFactory.getLogger(getClass()); - // Local disk cache, where the special key value.registries records the list of registry centers, and the others are the list of notified service providers + // Local disk cache, where the special key value.registries records the list of metadata centers, and the others are the list of notified service providers final Properties properties = new Properties(); private final ExecutorService reportCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveMetadataReport", true)); final Map allMetadataReports = new ConcurrentHashMap<>(4); @@ -336,7 +336,7 @@ long calculateStartTime() { class MetadataReportRetry { protected final Logger logger = LoggerFactory.getLogger(getClass()); - final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(0, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true)); + final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(0, new NamedThreadFactory("DubboMetadataReportRetryTimer", true)); volatile ScheduledFuture retryScheduledFuture; AtomicInteger retryCounter = new AtomicInteger(0); // retry task schedule period @@ -358,7 +358,7 @@ void startRetryTask() { retryScheduledFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { - // Check and connect to the registry + // Check and connect to the metadata try { int times = retryCounter.incrementAndGet(); logger.info("start to retry task for metadata report. retry times:" + times); From 698244495cb3f738707d814fabd53f2b60b5e364 Mon Sep 17 00:00:00 2001 From: cvictory Date: Wed, 27 Feb 2019 15:56:52 +0800 Subject: [PATCH 3/9] fix #3288. share zookeeper connection. --- .../dubbo-configcenter-zookeeper/pom.xml | 15 +-- .../support/zookeeper/CacheListener.java | 92 ++++++++----------- .../ZookeeperDynamicConfiguration.java | 72 ++------------- .../ZookeeperDynamicConfigurationFactory.java | 11 ++- .../dubbo-remoting-zookeeper/pom.xml | 6 +- .../remoting/zookeeper/DataListener.java | 11 +++ .../dubbo/remoting/zookeeper/EventType.java | 43 +++++++++ .../remoting/zookeeper/ZookeeperClient.java | 6 ++ .../curator/CuratorZookeeperClient.java | 72 ++++++++++++--- .../support/AbstractZookeeperClient.java | 24 ++++- .../zookeeper/zkclient/ZkClientWrapper.java | 13 +++ .../zkclient/ZkclientZookeeperClient.java | 26 +++++- 12 files changed, 249 insertions(+), 142 deletions(-) create mode 100644 dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java create mode 100644 dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml b/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml index bb9e1ad5d45..5c84f6515ac 100644 --- a/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml +++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/pom.xml @@ -33,16 +33,9 @@ ${project.parent.version} - org.apache.curator - curator-framework - - - org.apache.curator - curator-recipes - - - org.apache.zookeeper - zookeeper + org.apache.dubbo + dubbo-remoting-zookeeper + ${project.parent.version} org.apache.curator @@ -50,4 +43,4 @@ test - \ No newline at end of file + diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java index 1851a22b2f0..71587b17cc3 100644 --- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java +++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java @@ -21,13 +21,9 @@ import org.apache.dubbo.configcenter.ConfigChangeEvent; import org.apache.dubbo.configcenter.ConfigChangeType; import org.apache.dubbo.configcenter.ConfigurationListener; +import org.apache.dubbo.remoting.zookeeper.DataListener; +import org.apache.dubbo.remoting.zookeeper.EventType; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.framework.recipes.cache.TreeCacheListener; - -import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -37,9 +33,8 @@ /** * */ -public class CacheListener implements TreeCacheListener { - private static final byte[] EMPTY_BYTES = new byte[0]; +public class CacheListener implements DataListener { private Map> keyListeners = new ConcurrentHashMap<>(); private CountDownLatch initializedLatch; private String rootPath; @@ -49,76 +44,69 @@ public CacheListener(String rootPath, CountDownLatch initializedLatch) { this.initializedLatch = initializedLatch; } - @Override - public void childEvent(CuratorFramework aClient, TreeCacheEvent event) throws Exception { + public void addListener(String key, ConfigurationListener configurationListener) { + Set listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>()); + listeners.add(configurationListener); + } + + public void removeListener(String key, ConfigurationListener configurationListener) { + Set listeners = this.keyListeners.get(key); + if (listeners != null) { + listeners.remove(configurationListener); + } + } + + /** + * This is used to convert a configuration nodePath into a key + * TODO doc + * + * @param path + * @return key (nodePath less the config root path) + */ + private String pathToKey(String path) { + if (StringUtils.isEmpty(path)) { + return path; + } + return path.replace(rootPath + "/", "").replaceAll("/", "."); + } + - TreeCacheEvent.Type type = event.getType(); - ChildData data = event.getData(); - if (type == TreeCacheEvent.Type.INITIALIZED) { + @Override + public void dataChanged(String path, Object value, EventType eventType) { + if (eventType == null) { initializedLatch.countDown(); return; } - // TODO, ignore other event types - if (data == null) { + if (value == null) { return; } // TODO We limit the notification of config changes to a specific path level, for example // /dubbo/config/service/configurators, other config changes not in this level will not get notified, // say /dubbo/config/dubbo.properties - if (data.getPath().split("/").length >= 5) { - byte[] value = data.getData(); - String key = pathToKey(data.getPath()); + if (path.split("/").length >= 5) { + String key = pathToKey(path); ConfigChangeType changeType; - switch (type) { - case NODE_ADDED: + switch (eventType) { + case NodeCreated: changeType = ConfigChangeType.ADDED; break; - case NODE_REMOVED: + case NodeDeleted: changeType = ConfigChangeType.DELETED; break; - case NODE_UPDATED: + case NodeDataChanged: changeType = ConfigChangeType.MODIFIED; break; default: return; } - if (value == null) { - value = EMPTY_BYTES; - } - ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, new String(value, StandardCharsets.UTF_8), changeType); + ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, (String) value, changeType); Set listeners = keyListeners.get(key); if (CollectionUtils.isNotEmpty(listeners)) { listeners.forEach(listener -> listener.process(configChangeEvent)); } } } - - public void addListener(String key, ConfigurationListener configurationListener) { - Set listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>()); - listeners.add(configurationListener); - } - - public void removeListener(String key, ConfigurationListener configurationListener) { - Set listeners = this.keyListeners.get(key); - if (listeners != null) { - listeners.remove(configurationListener); - } - } - - /** - * This is used to convert a configuration nodePath into a key - * TODO doc - * - * @param path - * @return key (nodePath less the config root path) - */ - private String pathToKey(String path) { - if (StringUtils.isEmpty(path)) { - return path; - } - return path.replace(rootPath + "/", "").replaceAll("/", "."); - } } diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java index 7a106f86dec..bf74e8dc4db 100644 --- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java +++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java @@ -16,28 +16,18 @@ */ package org.apache.dubbo.configcenter.support.zookeeper; -import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.utils.NamedThreadFactory; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.configcenter.ConfigurationListener; import org.apache.dubbo.configcenter.DynamicConfiguration; +import org.apache.dubbo.remoting.zookeeper.ZookeeperClient; +import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter; -import org.apache.curator.RetryPolicy; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.TreeCache; -import org.apache.curator.retry.ExponentialBackoffRetry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import static org.apache.curator.framework.CuratorFrameworkFactory.newClient; import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY; /** @@ -45,53 +35,25 @@ */ public class ZookeeperDynamicConfiguration implements DynamicConfiguration { private static final Logger logger = LoggerFactory.getLogger(ZookeeperDynamicConfiguration.class); - private Executor executor; - private CuratorFramework client; // The final root path would be: /configRootPath/"config" private String rootPath; - private TreeCache treeCache; + private final ZookeeperClient zkClient; private CountDownLatch initializedLatch; private CacheListener cacheListener; private URL url; - ZookeeperDynamicConfiguration(URL url) { + ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) { this.url = url; rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config"; - RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); - int sessionTimeout = url.getParameter("config.session.timeout", 60 * 1000); - int connectTimeout = url.getParameter("config.connect.timeout", 10 * 1000); - String connectString = url.getBackupAddress(); - client = newClient(connectString, sessionTimeout, connectTimeout, policy); - client.start(); - - try { - boolean connected = client.blockUntilConnected(3 * connectTimeout, TimeUnit.MILLISECONDS); - if (!connected) { - if (url.getParameter(Constants.CONFIG_CHECK_KEY, true)) { - throw new IllegalStateException("Failed to connect to config center (zookeeper): " - + connectString + " in " + 3 * connectTimeout + "ms."); - } else { - logger.warn("The config center (zookeeper) is not fully initialized in " + 3 * connectTimeout + "ms, address is: " + connectString); - } - } - } catch (InterruptedException e) { - throw new IllegalStateException("The thread was interrupted unexpectedly when trying connecting to zookeeper " - + connectString + " config center, ", e); - } - initializedLatch = new CountDownLatch(1); this.cacheListener = new CacheListener(rootPath, initializedLatch); - this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true)); - // build local cache - try { - this.buildCache(); - } catch (Exception e) { - logger.warn("Failed to build local cache for config center (zookeeper), address is ." + connectString); - } + + zkClient = zookeeperTransporter.connect(url); + zkClient.addDataListener(rootPath, cacheListener); } /** @@ -100,11 +62,7 @@ public class ZookeeperDynamicConfiguration implements DynamicConfiguration { */ @Override public Object getInternalProperty(String key) { - ChildData childData = treeCache.getCurrentData(key); - if (childData != null) { - return new String(childData.getData(), StandardCharsets.UTF_8); - } - return null; + return zkClient.getContent(key); } /** @@ -141,18 +99,4 @@ public String getConfig(String key, String group, long timeout) throws IllegalSt return (String) getInternalProperty(rootPath + "/" + key); } - - /** - * Adds a listener to the pathChildrenCache, initializes the cache, then starts the cache-management background - * thread - */ - private void buildCache() throws Exception { - this.treeCache = new TreeCache(client, rootPath); - // create the watcher for future configuration updates - treeCache.getListenable().addListener(cacheListener, executor); - - // it's not blocking, so we use an extra latch 'initializedLatch' to make sure cache fully initialized before use. - treeCache.start(); - initializedLatch.await(); - } } diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java index 7994e0461f3..4d78133dbaf 100644 --- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java +++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationFactory.java @@ -19,13 +19,22 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.configcenter.AbstractDynamicConfigurationFactory; import org.apache.dubbo.configcenter.DynamicConfiguration; +import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter; /** * */ public class ZookeeperDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory { + + private ZookeeperTransporter zookeeperTransporter; + + public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { + this.zookeeperTransporter = zookeeperTransporter; + } + + @Override protected DynamicConfiguration createDynamicConfiguration(URL url) { - return new ZookeeperDynamicConfiguration(url); + return new ZookeeperDynamicConfiguration(url, zookeeperTransporter); } } diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml b/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml index 97ce12adc23..587f1822ceb 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml +++ b/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml @@ -46,10 +46,14 @@ org.apache.curator curator-framework + + org.apache.curator + curator-recipes + org.apache.curator curator-test test - \ No newline at end of file + diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java new file mode 100644 index 00000000000..a9400e86ffd --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java @@ -0,0 +1,11 @@ +package org.apache.dubbo.remoting.zookeeper; + +import java.util.List; + +/** + * @author cvictory ON 2019-02-26 + */ +public interface DataListener { + + void dataChanged(String path, Object value, EventType eventType); +} diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java new file mode 100644 index 00000000000..4ffba571c50 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java @@ -0,0 +1,43 @@ +package org.apache.dubbo.remoting.zookeeper; + +import org.apache.zookeeper.Watcher; + +/** + * 2019-02-26 + */ +public enum EventType { + None(-1), + NodeCreated(1), + NodeDeleted(2), + NodeDataChanged(3), + NodeChildrenChanged(4); + + private final int intValue; // Integer representation of value + // for sending over wire + + EventType(int intValue) { + this.intValue = intValue; + } + + public int getIntValue() { + return intValue; + } + + public static Watcher.Event.EventType fromInt(int intValue) { + switch (intValue) { + case -1: + return Watcher.Event.EventType.None; + case 1: + return Watcher.Event.EventType.NodeCreated; + case 2: + return Watcher.Event.EventType.NodeDeleted; + case 3: + return Watcher.Event.EventType.NodeDataChanged; + case 4: + return Watcher.Event.EventType.NodeChildrenChanged; + + default: + throw new RuntimeException("Invalid integer value for conversion to EventType"); + } + } +} diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java index b6875ee8e52..7e6f96fa562 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java @@ -30,6 +30,12 @@ public interface ZookeeperClient { List addChildListener(String path, ChildListener listener); + /** + * @param path: directory. All of child of path will be listened. + * @param listener + */ + void addDataListener(String path, DataListener listener); + void removeChildListener(String path, ChildListener listener); void addStateListener(StateListener listener); diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java index a78edda76a6..701db5d033c 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java @@ -19,15 +19,22 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryNTimes; + import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.remoting.zookeeper.ChildListener; +import org.apache.dubbo.remoting.zookeeper.DataListener; +import org.apache.dubbo.remoting.zookeeper.EventType; import org.apache.dubbo.remoting.zookeeper.StateListener; import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient; + import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NodeExistsException; @@ -37,9 +44,9 @@ import java.util.Collections; import java.util.List; -public class CuratorZookeeperClient extends AbstractZookeeperClient { +public class CuratorZookeeperClient extends AbstractZookeeperClient { - private final Charset charset = Charset.forName("UTF-8"); + static final Charset charset = Charset.forName("UTF-8"); private final CuratorFramework client; @@ -172,8 +179,8 @@ public void doClose() { } @Override - public CuratorWatcher createTargetChildListener(String path, ChildListener listener) { - return new CuratorWatcherImpl(listener); + public CuratorZookeeperClient.CuratorWatcherImpl createTargetChildListener(String path, ChildListener listener) { + return new CuratorZookeeperClient.CuratorWatcherImpl(client, listener); } @Override @@ -187,28 +194,51 @@ public List addTargetChildListener(String path, CuratorWatcher listener) } } + @Override + protected PathChildrenCacheListener createTargetDataListener(String path, DataListener listener) { + return new CuratorWatcherImpl(client, listener); + } + + @Override + protected void addTargetDataListener(String path, PathChildrenCacheListener pathChildrenCacheListener) { + try { + PathChildrenCache pathcache = new PathChildrenCache(client, "/some/path", true); + pathcache.getListenable().addListener(pathChildrenCacheListener); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + @Override public void removeTargetChildListener(String path, CuratorWatcher listener) { ((CuratorWatcherImpl) listener).unwatch(); } - private class CuratorWatcherImpl implements CuratorWatcher { + static class CuratorWatcherImpl implements CuratorWatcher, PathChildrenCacheListener { + + private CuratorFramework client; + private volatile ChildListener childListener; + private volatile DataListener dataListener; - private volatile ChildListener listener; - public CuratorWatcherImpl(ChildListener listener) { - this.listener = listener; + public CuratorWatcherImpl(CuratorFramework client, ChildListener listener) { + this.client = client; + this.childListener = listener; + } + + public CuratorWatcherImpl(CuratorFramework client, DataListener dataListener) { + this.dataListener = dataListener; } public void unwatch() { - this.listener = null; + this.childListener = null; } @Override public void process(WatchedEvent event) throws Exception { - if (listener != null) { + if (childListener != null) { String path = event.getPath() == null ? "" : event.getPath(); - listener.childChanged(path, + childListener.childChanged(path, // if path is null, curator using watcher will throw NullPointerException. // if client connect or disconnect to server, zookeeper will queue // watched event(Watcher.Event.EventType.None, .., path = null). @@ -217,6 +247,26 @@ public void process(WatchedEvent event) throws Exception { : Collections.emptyList()); } } + + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { + if (dataListener != null) { + PathChildrenCacheEvent.Type type = event.getType(); + EventType eventType = null; + switch (type) { + case CHILD_ADDED: + eventType = EventType.NodeCreated; + break; + case CHILD_UPDATED: + eventType = EventType.NodeDataChanged; + break; + case CHILD_REMOVED: + eventType = EventType.NodeDeleted; + break; + } + dataListener.dataChanged(event.getData().getPath(), new String(event.getData().getData(), charset), eventType); + } + } } } diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java index e90f7fb9c71..513ea362f7d 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java @@ -20,6 +20,7 @@ import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.remoting.zookeeper.ChildListener; +import org.apache.dubbo.remoting.zookeeper.DataListener; import org.apache.dubbo.remoting.zookeeper.StateListener; import org.apache.dubbo.remoting.zookeeper.ZookeeperClient; @@ -29,7 +30,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; -public abstract class AbstractZookeeperClient implements ZookeeperClient { +public abstract class AbstractZookeeperClient implements ZookeeperClient { protected static final Logger logger = LoggerFactory.getLogger(AbstractZookeeperClient.class); @@ -39,6 +40,8 @@ public abstract class AbstractZookeeperClient implements Zo private final ConcurrentMap> childListeners = new ConcurrentHashMap>(); + private final ConcurrentMap> listeners = new ConcurrentHashMap>(); + private volatile boolean closed = false; public AbstractZookeeperClient(URL url) { @@ -97,6 +100,21 @@ public List addChildListener(String path, final ChildListener listener) return addTargetChildListener(path, targetListener); } + @Override + public void addDataListener(String path, DataListener listener) { + ConcurrentMap dataListenerMap = listeners.get(path); + if (dataListenerMap == null) { + listeners.putIfAbsent(path, new ConcurrentHashMap()); + dataListenerMap = listeners.get(path); + } + TargetDataListener targetListener = dataListenerMap.get(listener); + if (targetListener == null) { + dataListenerMap.putIfAbsent(listener, createTargetDataListener(path, listener)); + targetListener = dataListenerMap.get(listener); + } + addTargetDataListener(path, targetListener); + } + @Override public void removeChildListener(String path, ChildListener listener) { ConcurrentMap listeners = childListeners.get(path); @@ -167,6 +185,10 @@ public String getContent(String path) { protected abstract List addTargetChildListener(String path, TargetChildListener listener); + protected abstract TargetDataListener createTargetDataListener(String path, DataListener listener); + + protected abstract void addTargetDataListener(String path, TargetDataListener listener); + protected abstract void removeTargetChildListener(String path, TargetChildListener listener); protected abstract String doGetContent(String path); diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java index ae8a3ef87b2..a936991f52d 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java @@ -17,6 +17,7 @@ package org.apache.dubbo.remoting.zookeeper.zkclient; import org.I0Itec.zkclient.IZkChildListener; +import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.ZkClient; import org.apache.dubbo.common.logger.Logger; @@ -25,7 +26,9 @@ import org.apache.zookeeper.Watcher.Event.KeeperState; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; /** @@ -131,6 +134,16 @@ public void unsubscribeChildChanges(String path, IZkChildListener listener) { client.unsubscribeChildChanges(path, listener); } + public void subscribeDataChanges(String path, IZkDataListener listener) { + Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); + client.subscribeDataChanges(path, listener); + } + + public void unsubscribeDataChanges(String path, IZkDataListener dataListener) { + Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); + client.unsubscribeDataChanges(path, dataListener); + } + private void makeClientReady(ZkClient client, Throwable e) { if (e != null) { logger.error("Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!", e); diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java index c36640025b9..9920be7ddec 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java @@ -18,6 +18,7 @@ import org.I0Itec.zkclient.IZkChildListener; +import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.exception.ZkNodeExistsException; @@ -26,13 +27,15 @@ import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.remoting.zookeeper.ChildListener; +import org.apache.dubbo.remoting.zookeeper.DataListener; +import org.apache.dubbo.remoting.zookeeper.EventType; import org.apache.dubbo.remoting.zookeeper.StateListener; import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient; import org.apache.zookeeper.Watcher.Event.KeeperState; import java.util.List; -public class ZkclientZookeeperClient extends AbstractZookeeperClient { +public class ZkclientZookeeperClient extends AbstractZookeeperClient { private Logger logger = LoggerFactory.getLogger(ZkclientZookeeperClient.class); @@ -160,6 +163,27 @@ public List addTargetChildListener(String path, final IZkChildListener l return client.subscribeChildChanges(path, listener); } + @Override + protected IZkDataListener createTargetDataListener(String path, DataListener listener) { + return new IZkDataListener(){ + + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + listener.dataChanged(dataPath, data, EventType.NodeDataChanged); + } + + @Override + public void handleDataDeleted(String dataPath) throws Exception { + listener.dataChanged(dataPath, null, EventType.NodeDeleted); + } + }; + } + + @Override + protected void addTargetDataListener(String path, IZkDataListener iZkDataListener) { + client.subscribeDataChanges(path, iZkDataListener); + } + @Override public void removeTargetChildListener(String path, IZkChildListener listener) { client.unsubscribeChildChanges(path, listener); From 30c2e95b6fa3cc82a3fdee6d235d7e28a6fa9957 Mon Sep 17 00:00:00 2001 From: cvictory Date: Wed, 6 Mar 2019 21:49:19 +0800 Subject: [PATCH 4/9] unit test --- .../curator/CuratorZookeeperClient.java | 58 +++++++++++-------- ...bo.remoting.zookeeper.ZookeeperTransporter | 3 +- .../curator/CuratorZookeeperClientTest.java | 45 ++++++++++++++ 3 files changed, 80 insertions(+), 26 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java index 701db5d033c..c2449308917 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java @@ -16,16 +16,6 @@ */ package org.apache.dubbo.remoting.zookeeper.curator; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.curator.retry.RetryNTimes; - import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.utils.StringUtils; @@ -35,6 +25,15 @@ import org.apache.dubbo.remoting.zookeeper.StateListener; import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryNTimes; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NodeExistsException; @@ -44,10 +43,10 @@ import java.util.Collections; import java.util.List; -public class CuratorZookeeperClient extends AbstractZookeeperClient { +public class CuratorZookeeperClient extends AbstractZookeeperClient { static final Charset charset = Charset.forName("UTF-8"); - private final CuratorFramework client; + final CuratorFramework client; public CuratorZookeeperClient(URL url) { @@ -103,10 +102,15 @@ public void createEphemeral(String path) { @Override protected void createPersistent(String path, String data) { + byte[] dataBytes = data.getBytes(charset); try { - byte[] dataBytes = data.getBytes(charset); client.create().forPath(path, dataBytes); } catch (NodeExistsException e) { + try { + client.setData().forPath(path, dataBytes); + } catch (Exception e1) { + throw new IllegalStateException(e.getMessage(), e1); + } } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } @@ -114,10 +118,15 @@ protected void createPersistent(String path, String data) { @Override protected void createEphemeral(String path, String data) { + byte[] dataBytes = data.getBytes(charset); try { - byte[] dataBytes = data.getBytes(charset); client.create().withMode(CreateMode.EPHEMERAL).forPath(path, dataBytes); } catch (NodeExistsException e) { + try { + client.setData().forPath(path, dataBytes); + } catch (Exception e1) { + throw new IllegalStateException(e.getMessage(), e1); + } } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } @@ -195,15 +204,16 @@ public List addTargetChildListener(String path, CuratorWatcher listener) } @Override - protected PathChildrenCacheListener createTargetDataListener(String path, DataListener listener) { + protected TreeCacheListener createTargetDataListener(String path, DataListener listener) { return new CuratorWatcherImpl(client, listener); } @Override - protected void addTargetDataListener(String path, PathChildrenCacheListener pathChildrenCacheListener) { + protected void addTargetDataListener(String path, TreeCacheListener treeCacheListener) { try { - PathChildrenCache pathcache = new PathChildrenCache(client, "/some/path", true); - pathcache.getListenable().addListener(pathChildrenCacheListener); + TreeCache treeCache = new TreeCache(client, path); + treeCache.start(); + treeCache.getListenable().addListener(treeCacheListener); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } @@ -214,7 +224,7 @@ public void removeTargetChildListener(String path, CuratorWatcher listener) { ((CuratorWatcherImpl) listener).unwatch(); } - static class CuratorWatcherImpl implements CuratorWatcher, PathChildrenCacheListener { + static class CuratorWatcherImpl implements CuratorWatcher, TreeCacheListener { private CuratorFramework client; private volatile ChildListener childListener; @@ -249,18 +259,18 @@ public void process(WatchedEvent event) throws Exception { } @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { + public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { if (dataListener != null) { - PathChildrenCacheEvent.Type type = event.getType(); + TreeCacheEvent.Type type = event.getType(); EventType eventType = null; switch (type) { - case CHILD_ADDED: + case NODE_ADDED: eventType = EventType.NodeCreated; break; - case CHILD_UPDATED: + case NODE_UPDATED: eventType = EventType.NodeDataChanged; break; - case CHILD_REMOVED: + case NODE_REMOVED: eventType = EventType.NodeDeleted; break; } diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter index e9b9349f339..f8cbd5b417d 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter @@ -1,2 +1 @@ -zkclient=org.apache.dubbo.remoting.zookeeper.zkclient.ZkclientZookeeperTransporter -curator=org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperTransporter \ No newline at end of file +curator=org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperTransporter diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java index cb89b166c1c..97ee52d7120 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java @@ -20,7 +20,16 @@ import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.remoting.zookeeper.ChildListener; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingServer; import org.apache.zookeeper.WatchedEvent; import org.junit.jupiter.api.AfterEach; @@ -31,6 +40,7 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; @@ -41,6 +51,7 @@ public class CuratorZookeeperClientTest { private TestingServer zkServer; private CuratorZookeeperClient curatorClient; + CuratorFramework client = null; @BeforeEach public void setUp() throws Exception { @@ -48,6 +59,8 @@ public void setUp() throws Exception { zkServer = new TestingServer(zkServerPort, true); curatorClient = new CuratorZookeeperClient(URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort + "/org.apache.dubbo.registry.RegistryService")); + client = CuratorFrameworkFactory.newClient(zkServer.getConnectString(), new ExponentialBackoffRetry(1000, 3)); + client.start(); } @Test @@ -153,4 +166,36 @@ public void tearDown() throws Exception { curatorClient.close(); zkServer.stop(); } + + @Test + public void testAddDataListener() throws Exception { + String listenerPath = "/dubbo/service.name/configuration"; + String path = listenerPath + "/dat/data"; + String value = "vav"; + +// curatorClient.create(path, true); + curatorClient.create(path + "/d.json", value, true); + String valueFromCache = curatorClient.getContent(path + "/d.json"); + Assertions.assertEquals(value, valueFromCache); + final AtomicInteger atomicInteger = new AtomicInteger(0); + curatorClient.addTargetDataListener(listenerPath, new TreeCacheListener() { + @Override + public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { + System.out.println("===" + event); + atomicInteger.incrementAndGet(); + } + }); + + //curatorClient.delete(path + "/d.json"); + valueFromCache = curatorClient.getContent(path + "/d.json"); + Assertions.assertNotNull(valueFromCache); + curatorClient.client.setData().forPath(path + "/d.json", "sdsdf".getBytes()); + curatorClient.client.setData().forPath(path + "/d.json", "dfsasf".getBytes()); + curatorClient.delete(path + "/d.json"); + curatorClient.delete(path); + valueFromCache = curatorClient.getContent(path + "/d.json"); + Assertions.assertNull(valueFromCache); + Thread.sleep(2000l); + Assertions.assertTrue(8l >= atomicInteger.get()); + } } From 0f830de69dbd3db2423057405bd64f710c61349f Mon Sep 17 00:00:00 2001 From: cvictory Date: Thu, 7 Mar 2019 10:26:47 +0800 Subject: [PATCH 5/9] unit test --- .../zookeeper/curator/CuratorZookeeperClient.java | 9 ++++++++- .../zookeeper/curator/CuratorZookeeperClientTest.java | 9 ++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java index c2449308917..61caa25a93f 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java @@ -46,7 +46,7 @@ public class CuratorZookeeperClient extends AbstractZookeeperClient { static final Charset charset = Charset.forName("UTF-8"); - final CuratorFramework client; + private final CuratorFramework client; public CuratorZookeeperClient(URL url) { @@ -279,4 +279,11 @@ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exc } } + /** + * just for unit test + * @return + */ + CuratorFramework getClient() { + return client; + } } diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java index 97ee52d7120..1c42116e3d2 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java @@ -168,12 +168,11 @@ public void tearDown() throws Exception { } @Test - public void testAddDataListener() throws Exception { + public void testAddTargetDataListener() throws Exception { String listenerPath = "/dubbo/service.name/configuration"; String path = listenerPath + "/dat/data"; String value = "vav"; -// curatorClient.create(path, true); curatorClient.create(path + "/d.json", value, true); String valueFromCache = curatorClient.getContent(path + "/d.json"); Assertions.assertEquals(value, valueFromCache); @@ -186,16 +185,16 @@ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exc } }); - //curatorClient.delete(path + "/d.json"); valueFromCache = curatorClient.getContent(path + "/d.json"); Assertions.assertNotNull(valueFromCache); - curatorClient.client.setData().forPath(path + "/d.json", "sdsdf".getBytes()); - curatorClient.client.setData().forPath(path + "/d.json", "dfsasf".getBytes()); + curatorClient.getClient().setData().forPath(path + "/d.json", "sdsdf".getBytes()); + curatorClient.getClient().setData().forPath(path + "/d.json", "dfsasf".getBytes()); curatorClient.delete(path + "/d.json"); curatorClient.delete(path); valueFromCache = curatorClient.getContent(path + "/d.json"); Assertions.assertNull(valueFromCache); Thread.sleep(2000l); Assertions.assertTrue(8l >= atomicInteger.get()); + Assertions.assertTrue(2l <= atomicInteger.get()); } } From 3ee395417a63f82fc895f902de1ba2a7f439b627 Mon Sep 17 00:00:00 2001 From: cvictory Date: Thu, 7 Mar 2019 10:58:48 +0800 Subject: [PATCH 6/9] fix #3288. remove some unused code and unit test --- .../ZookeeperDynamicConfiguration.java | 7 +- .../remoting/zookeeper/ZookeeperClient.java | 10 +- .../curator/CuratorZookeeperClient.java | 26 ++- .../support/AbstractZookeeperClient.java | 18 ++ .../zookeeper/zkclient/ZkClientWrapper.java | 157 -------------- .../zkclient/ZkclientZookeeperClient.java | 192 ------------------ .../ZkclientZookeeperTransporter.java | 29 --- .../curator/CuratorZookeeperClientTest.java | 8 +- .../zkclient/ZkClientWrapperTest.java | 56 ----- .../zkclient/ZkclientZookeeperClientTest.java | 140 ------------- .../ZkclientZookeeperTransporterTest.java | 53 ----- 11 files changed, 56 insertions(+), 640 deletions(-) delete mode 100644 dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java delete mode 100644 dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java delete mode 100644 dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporter.java delete mode 100644 dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapperTest.java delete mode 100644 dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClientTest.java delete mode 100644 dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporterTest.java diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java index bf74e8dc4db..6334463f23d 100644 --- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java +++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java @@ -17,6 +17,7 @@ package org.apache.dubbo.configcenter.support.zookeeper; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.utils.NamedThreadFactory; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.configcenter.ConfigurationListener; import org.apache.dubbo.configcenter.DynamicConfiguration; @@ -27,6 +28,8 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY; @@ -36,6 +39,7 @@ public class ZookeeperDynamicConfiguration implements DynamicConfiguration { private static final Logger logger = LoggerFactory.getLogger(ZookeeperDynamicConfiguration.class); + private Executor executor; // The final root path would be: /configRootPath/"config" private String rootPath; private final ZookeeperClient zkClient; @@ -51,9 +55,10 @@ public class ZookeeperDynamicConfiguration implements DynamicConfiguration { initializedLatch = new CountDownLatch(1); this.cacheListener = new CacheListener(rootPath, initializedLatch); + this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true)); zkClient = zookeeperTransporter.connect(url); - zkClient.addDataListener(rootPath, cacheListener); + zkClient.addDataListener(rootPath, cacheListener, executor); } /** diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java index 7e6f96fa562..911c728d168 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java @@ -19,6 +19,7 @@ import org.apache.dubbo.common.URL; import java.util.List; +import java.util.concurrent.Executor; public interface ZookeeperClient { @@ -31,11 +32,18 @@ public interface ZookeeperClient { List addChildListener(String path, ChildListener listener); /** - * @param path: directory. All of child of path will be listened. + * @param path: directory. All of child of path will be listened. * @param listener */ void addDataListener(String path, DataListener listener); + /** + * @param path: directory. All of child of path will be listened. + * @param listener + * @param executor another thread + */ + void addDataListener(String path, DataListener listener, Executor executor); + void removeChildListener(String path, ChildListener listener); void addStateListener(StateListener listener); diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java index 61caa25a93f..90b68266656 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java @@ -42,8 +42,9 @@ import java.nio.charset.Charset; import java.util.Collections; import java.util.List; +import java.util.concurrent.Executor; -public class CuratorZookeeperClient extends AbstractZookeeperClient { +public class CuratorZookeeperClient extends AbstractZookeeperClient { static final Charset charset = Charset.forName("UTF-8"); private final CuratorFramework client; @@ -193,7 +194,7 @@ public CuratorZookeeperClient.CuratorWatcherImpl createTargetChildListener(Strin } @Override - public List addTargetChildListener(String path, CuratorWatcher listener) { + public List addTargetChildListener(String path, CuratorWatcherImpl listener) { try { return client.getChildren().usingWatcher(listener).forPath(path); } catch (NoNodeException e) { @@ -215,13 +216,24 @@ protected void addTargetDataListener(String path, TreeCacheListener treeCacheLis treeCache.start(); treeCache.getListenable().addListener(treeCacheListener); } catch (Exception e) { - throw new IllegalStateException(e.getMessage(), e); + throw new IllegalStateException("Add treeCache listener for path:" + path, e); } } @Override - public void removeTargetChildListener(String path, CuratorWatcher listener) { - ((CuratorWatcherImpl) listener).unwatch(); + protected void addTargetDataListener(String path, TreeCacheListener treeCacheListener, Executor executor) { + try { + TreeCache treeCache = new TreeCache(client, path); + treeCache.start(); + treeCache.getListenable().addListener(treeCacheListener, executor); + } catch (Exception e) { + throw new IllegalStateException("Add treeCache listener for path:" + path, e); + } + } + + @Override + public void removeTargetChildListener(String path, CuratorWatcherImpl listener) { + listener.unwatch(); } static class CuratorWatcherImpl implements CuratorWatcher, TreeCacheListener { @@ -240,6 +252,9 @@ public CuratorWatcherImpl(CuratorFramework client, DataListener dataListener) { this.dataListener = dataListener; } + protected CuratorWatcherImpl(){ + } + public void unwatch() { this.childListener = null; } @@ -281,6 +296,7 @@ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exc /** * just for unit test + * * @return */ CuratorFramework getClient() { diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java index 513ea362f7d..19fea059666 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.Executor; public abstract class AbstractZookeeperClient implements ZookeeperClient { @@ -115,6 +116,21 @@ public void addDataListener(String path, DataListener listener) { addTargetDataListener(path, targetListener); } + @Override + public void addDataListener(String path, DataListener listener, Executor executor) { + ConcurrentMap dataListenerMap = listeners.get(path); + if (dataListenerMap == null) { + listeners.putIfAbsent(path, new ConcurrentHashMap()); + dataListenerMap = listeners.get(path); + } + TargetDataListener targetListener = dataListenerMap.get(listener); + if (targetListener == null) { + dataListenerMap.putIfAbsent(listener, createTargetDataListener(path, listener)); + targetListener = dataListenerMap.get(listener); + } + addTargetDataListener(path, targetListener, executor); + } + @Override public void removeChildListener(String path, ChildListener listener) { ConcurrentMap listeners = childListeners.get(path); @@ -189,6 +205,8 @@ public String getContent(String path) { protected abstract void addTargetDataListener(String path, TargetDataListener listener); + protected abstract void addTargetDataListener(String path, TargetDataListener listener, Executor executor); + protected abstract void removeTargetChildListener(String path, TargetChildListener listener); protected abstract String doGetContent(String path); diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java deleted file mode 100644 index a936991f52d..00000000000 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapper.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.remoting.zookeeper.zkclient; - -import org.I0Itec.zkclient.IZkChildListener; -import org.I0Itec.zkclient.IZkDataListener; -import org.I0Itec.zkclient.IZkStateListener; -import org.I0Itec.zkclient.ZkClient; -import org.apache.dubbo.common.logger.Logger; -import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.utils.Assert; -import org.apache.zookeeper.Watcher.Event.KeeperState; - -import java.util.List; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.TimeUnit; - -/** - * Zkclient wrapper class that can monitor the state of the connection automatically after the connection is out of time - * It is also consistent with the use of curator - * - * @date 2017/10/29 - */ -public class ZkClientWrapper { - private Logger logger = LoggerFactory.getLogger(ZkClientWrapper.class); - private long timeout; - private ZkClient client; - private volatile KeeperState state; - private CompletableFuture completableFuture; - private volatile boolean started = false; - - public ZkClientWrapper(final String serverAddr, long timeout) { - this.timeout = timeout; - completableFuture = CompletableFuture.supplyAsync(() -> new ZkClient(serverAddr, Integer.MAX_VALUE)); - } - - public void start() { - if (!started) { - try { - client = completableFuture.get(timeout, TimeUnit.MILLISECONDS); -// this.client.subscribeStateChanges(IZkStateListener); - } catch (Throwable t) { - logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t); - completableFuture.whenComplete(this::makeClientReady); - } - started = true; - } else { - logger.warn("Zkclient has already been started!"); - } - } - - public void addListener(IZkStateListener listener) { - completableFuture.whenComplete((value, exception) -> { - this.makeClientReady(value, exception); - if (exception == null) { - client.subscribeStateChanges(listener); - } - }); - } - - public boolean isConnected() { -// return client != null && state == KeeperState.SyncConnected; - return client != null; - } - - public void createPersistent(String path) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - client.createPersistent(path, true); - } - - public void createEphemeral(String path) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - client.createEphemeral(path); - } - - public void createPersistent(String path, String data) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - client.createPersistent(path, data); - } - - public void createEphemeral(String path, String data) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - client.createEphemeral(path, data); - } - - public void delete(String path) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - client.delete(path); - } - - public List getChildren(String path) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - return client.getChildren(path); - } - - public String getData(String path) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - return client.readData(path); - } - - public boolean exists(String path) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - return client.exists(path); - } - - public void close() { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - client.close(); - } - - public List subscribeChildChanges(String path, final IZkChildListener listener) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - return client.subscribeChildChanges(path, listener); - } - - public void unsubscribeChildChanges(String path, IZkChildListener listener) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - client.unsubscribeChildChanges(path, listener); - } - - public void subscribeDataChanges(String path, IZkDataListener listener) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - client.subscribeDataChanges(path, listener); - } - - public void unsubscribeDataChanges(String path, IZkDataListener dataListener) { - Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!")); - client.unsubscribeDataChanges(path, dataListener); - } - - private void makeClientReady(ZkClient client, Throwable e) { - if (e != null) { - logger.error("Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!", e); - } else { - this.client = client; -// this.client.subscribeStateChanges(IZkStateListener); - } - } - - -} diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java deleted file mode 100644 index 9920be7ddec..00000000000 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClient.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.remoting.zookeeper.zkclient; - - -import org.I0Itec.zkclient.IZkChildListener; -import org.I0Itec.zkclient.IZkDataListener; -import org.I0Itec.zkclient.IZkStateListener; -import org.I0Itec.zkclient.exception.ZkNoNodeException; -import org.I0Itec.zkclient.exception.ZkNodeExistsException; -import org.apache.dubbo.common.Constants; -import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.logger.Logger; -import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.remoting.zookeeper.ChildListener; -import org.apache.dubbo.remoting.zookeeper.DataListener; -import org.apache.dubbo.remoting.zookeeper.EventType; -import org.apache.dubbo.remoting.zookeeper.StateListener; -import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient; -import org.apache.zookeeper.Watcher.Event.KeeperState; - -import java.util.List; - -public class ZkclientZookeeperClient extends AbstractZookeeperClient { - - private Logger logger = LoggerFactory.getLogger(ZkclientZookeeperClient.class); - - private final ZkClientWrapper client; - - private volatile KeeperState state = KeeperState.SyncConnected; - - public ZkclientZookeeperClient(URL url) { - super(url); - long timeout = url.getParameter(Constants.TIMEOUT_KEY, 30000L); - client = new ZkClientWrapper(url.getBackupAddress(), timeout); - client.addListener(new IZkStateListener() { - @Override - public void handleStateChanged(KeeperState state) throws Exception { - ZkclientZookeeperClient.this.state = state; - if (state == KeeperState.Disconnected) { - stateChanged(StateListener.DISCONNECTED); - } else if (state == KeeperState.SyncConnected) { - stateChanged(StateListener.CONNECTED); - } - } - - @Override - public void handleNewSession() throws Exception { - stateChanged(StateListener.RECONNECTED); - } - }); - client.start(); - } - - @Override - public void createPersistent(String path) { - try { - client.createPersistent(path); - } catch (ZkNodeExistsException e) { - logger.error("zookeeper failed to create persistent node with " + path + ": ", e); - } - } - - @Override - public void createEphemeral(String path) { - try { - client.createEphemeral(path); - } catch (ZkNodeExistsException e) { - logger.error("zookeeper failed to create ephemeral node with " + path + ": ", e); - } - } - - @Override - protected void createPersistent(String path, String data) { - try { - client.createPersistent(path, data); - } catch (ZkNodeExistsException e) { - logger.error("zookeeper failed to create persistent node with " + - path + " and " + data + " : ", e); - } - } - - @Override - protected void createEphemeral(String path, String data) { - try { - client.createEphemeral(path, data); - } catch (ZkNodeExistsException e) { - logger.error("zookeeper failed to create ephemeral node with " + - path + " and " + data + " : ", e); - } - } - - @Override - public void delete(String path) { - try { - client.delete(path); - } catch (ZkNoNodeException e) { - logger.error("zookeeper failed to delete node with " + path + ": ", e); - } - } - - @Override - public List getChildren(String path) { - try { - return client.getChildren(path); - } catch (ZkNoNodeException e) { - logger.error("zookeeper failed to get children node with " + path + ": ", e); - return null; - } - } - - @Override - public boolean checkExists(String path) { - try { - return client.exists(path); - } catch (Throwable t) { - logger.error("zookeeper failed to check node existing with " + path + ": ", t); - } - return false; - } - - @Override - public boolean isConnected() { - return state == KeeperState.SyncConnected; - } - - @Override - public String doGetContent(String path) { - try { - return client.getData(path); - } catch (ZkNoNodeException e) { - logger.error("zookeeper failed to get data with " + path + ": ", e); - return null; - } - } - - @Override - public void doClose() { - client.close(); - } - - @Override - public IZkChildListener createTargetChildListener(String path, final ChildListener listener) { - return listener::childChanged; - } - - @Override - public List addTargetChildListener(String path, final IZkChildListener listener) { - return client.subscribeChildChanges(path, listener); - } - - @Override - protected IZkDataListener createTargetDataListener(String path, DataListener listener) { - return new IZkDataListener(){ - - @Override - public void handleDataChange(String dataPath, Object data) throws Exception { - listener.dataChanged(dataPath, data, EventType.NodeDataChanged); - } - - @Override - public void handleDataDeleted(String dataPath) throws Exception { - listener.dataChanged(dataPath, null, EventType.NodeDeleted); - } - }; - } - - @Override - protected void addTargetDataListener(String path, IZkDataListener iZkDataListener) { - client.subscribeDataChanges(path, iZkDataListener); - } - - @Override - public void removeTargetChildListener(String path, IZkChildListener listener) { - client.unsubscribeChildChanges(path, listener); - } - -} diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporter.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporter.java deleted file mode 100644 index 0ad86ff7850..00000000000 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporter.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.remoting.zookeeper.zkclient; - -import org.apache.dubbo.common.URL; -import org.apache.dubbo.remoting.zookeeper.ZookeeperClient; -import org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperTransporter; - -public class ZkclientZookeeperTransporter extends AbstractZookeeperTransporter { - @Override - public ZookeeperClient createZookeeperClient(URL url) { - return new ZkclientZookeeperClient(url); - } - -} diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java index 1c42116e3d2..0fb1295cd62 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java @@ -22,11 +22,6 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; -import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; @@ -87,7 +82,8 @@ public void testChildrenListener() throws InterruptedException { String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers"; curatorClient.create(path, false); final CountDownLatch countDownLatch = new CountDownLatch(1); - curatorClient.addTargetChildListener(path, new CuratorWatcher() { + curatorClient.addTargetChildListener(path, new CuratorZookeeperClient.CuratorWatcherImpl() { + @Override public void process(WatchedEvent watchedEvent) throws Exception { countDownLatch.countDown(); diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapperTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapperTest.java deleted file mode 100644 index 629c0e9f772..00000000000 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkClientWrapperTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.remoting.zookeeper.zkclient; - -import org.apache.dubbo.common.utils.NetUtils; -import org.I0Itec.zkclient.IZkChildListener; -import org.apache.curator.test.TestingServer; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Mockito.mock; - -public class ZkClientWrapperTest { - private TestingServer zkServer; - private ZkClientWrapper zkClientWrapper; - - @BeforeEach - public void setUp() throws Exception { - int zkServerPort = NetUtils.getAvailablePort(); - zkServer = new TestingServer(zkServerPort, true); - zkClientWrapper = new ZkClientWrapper("127.0.0.1:" + zkServerPort, 10000); - } - - @AfterEach - public void tearDown() throws Exception { - zkServer.stop(); - } - - @Test - public void testConnectedStatus() { - boolean connected = zkClientWrapper.isConnected(); - assertThat(connected, is(false)); - zkClientWrapper.start(); - - IZkChildListener listener = mock(IZkChildListener.class); - zkClientWrapper.subscribeChildChanges("/path", listener); - zkClientWrapper.unsubscribeChildChanges("/path", listener); - } -} \ No newline at end of file diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClientTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClientTest.java deleted file mode 100644 index 73c402a4b2c..00000000000 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperClientTest.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.remoting.zookeeper.zkclient; - -import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.utils.NetUtils; -import org.apache.dubbo.remoting.zookeeper.StateListener; -import org.I0Itec.zkclient.IZkChildListener; -import org.apache.curator.test.TestingServer; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CountDownLatch; - -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.core.Is.is; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.hamcrest.MatcherAssert.assertThat; - -public class ZkclientZookeeperClientTest { - private TestingServer zkServer; - private ZkclientZookeeperClient zkclientZookeeperClient; - - @BeforeEach - public void setUp() throws Exception { - int zkServerPort = NetUtils.getAvailablePort(); - zkServer = new TestingServer(zkServerPort, true); - zkclientZookeeperClient = new ZkclientZookeeperClient(URL.valueOf("zookeeper://127.0.0.1:" + - zkServerPort + "/org.apache.dubbo.registry.RegistryService")); - } - - @Test - public void testCheckExists() { - String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers"; - zkclientZookeeperClient.create(path, false); - assertThat(zkclientZookeeperClient.checkExists(path), is(true)); - assertThat(zkclientZookeeperClient.checkExists(path + "/noneexits"), is(false)); - } - - @Test - public void testDeletePath() { - String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers"; - zkclientZookeeperClient.create(path, false); - assertThat(zkclientZookeeperClient.checkExists(path), is(true)); - - zkclientZookeeperClient.delete(path); - assertThat(zkclientZookeeperClient.checkExists(path), is(false)); - } - - @Test - public void testConnectState() throws Exception { - assertThat(zkclientZookeeperClient.isConnected(), is(true)); - final CountDownLatch stopLatch = new CountDownLatch(1); - zkclientZookeeperClient.addStateListener(new StateListener() { - @Override - public void stateChanged(int connected) { - stopLatch.countDown(); - } - }); - zkServer.stop(); - stopLatch.await(); - assertThat(zkclientZookeeperClient.isConnected(), is(false)); - } - - @Test - public void testChildrenListener() throws InterruptedException { - String path = "/dubbo/org.apache.dubbo.demo.DemoService/providers"; - zkclientZookeeperClient.create(path, false); - final CountDownLatch countDownLatch = new CountDownLatch(1); - zkclientZookeeperClient.addTargetChildListener(path, new IZkChildListener() { - @Override - public void handleChildChange(String s, List list) throws Exception { - countDownLatch.countDown(); - } - }); - zkclientZookeeperClient.createPersistent(path + "/provider1"); - countDownLatch.await(); - } - - @Test - public void testGetChildren() throws IOException { - String path = "/dubbo/org.apache.dubbo.demo.DemoService/parentProviders"; - zkclientZookeeperClient.create(path, false); - for (int i = 0; i < 5; i++) { - zkclientZookeeperClient.createEphemeral(path + "/server" + i); - } - List zookeeperClientChildren = zkclientZookeeperClient.getChildren(path); - assertThat(zookeeperClientChildren, hasSize(5)); - } - - @Test - public void testCreateContentPersistent() { - String path = "/ZkclientZookeeperClient/content.data"; - String content = "createContentTest"; - zkclientZookeeperClient.delete(path); - assertThat(zkclientZookeeperClient.checkExists(path), is(false)); - assertNull(zkclientZookeeperClient.getContent(path)); - - zkclientZookeeperClient.create(path, content, false); - assertThat(zkclientZookeeperClient.checkExists(path), is(true)); - assertEquals(zkclientZookeeperClient.getContent(path), content); - } - - @Test - public void testCreateContentTem() { - String path = "/ZkclientZookeeperClient/content.data"; - String content = "createContentTest"; - zkclientZookeeperClient.delete(path); - assertThat(zkclientZookeeperClient.checkExists(path), is(false)); - assertNull(zkclientZookeeperClient.getContent(path)); - - zkclientZookeeperClient.create(path, content, true); - assertThat(zkclientZookeeperClient.checkExists(path), is(true)); - assertEquals(zkclientZookeeperClient.getContent(path), content); - } - - @AfterEach - public void tearDown() throws Exception { - zkclientZookeeperClient.close(); - zkServer.stop(); - } -} diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporterTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporterTest.java deleted file mode 100644 index cbadda97aa5..00000000000 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/zkclient/ZkclientZookeeperTransporterTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.remoting.zookeeper.zkclient; - -import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.utils.NetUtils; -import org.apache.dubbo.remoting.zookeeper.ZookeeperClient; -import org.apache.curator.test.TestingServer; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.IsNot.not; -import static org.hamcrest.core.IsNull.nullValue; - -public class ZkclientZookeeperTransporterTest { - private TestingServer zkServer; - private ZookeeperClient zookeeperClient; - - @BeforeEach - public void setUp() throws Exception { - int zkServerPort = NetUtils.getAvailablePort(); - zkServer = new TestingServer(zkServerPort, true); - zookeeperClient = new ZkclientZookeeperTransporter().connect(URL.valueOf("zookeeper://127.0.0.1:" + - zkServerPort + "/service")); - } - - @Test - public void testZookeeperClient() { - assertThat(zookeeperClient, not(nullValue())); - zookeeperClient.close(); - } - - @AfterEach - public void tearDown() throws Exception { - zkServer.stop(); - } -} \ No newline at end of file From 1e6b59d494f517185d0cf8e5a0a7f045a1193716 Mon Sep 17 00:00:00 2001 From: cvictory Date: Thu, 7 Mar 2019 11:22:43 +0800 Subject: [PATCH 7/9] #3288. remove zkclient pom, fix rat issue --- dubbo-dependencies-bom/pom.xml | 6 ------ .../dubbo-remoting-zookeeper/pom.xml | 4 ---- .../dubbo/remoting/zookeeper/DataListener.java | 18 ++++++++++++++++-- .../dubbo/remoting/zookeeper/EventType.java | 16 ++++++++++++++++ 4 files changed, 32 insertions(+), 12 deletions(-) diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml index 37d2a068e15..b50cf77ed19 100644 --- a/dubbo-dependencies-bom/pom.xml +++ b/dubbo-dependencies-bom/pom.xml @@ -83,7 +83,6 @@ 4.4.6 1.2.46 3.4.13 - 0.2 4.0.1 2.12.0 2.9.0 @@ -181,11 +180,6 @@ zookeeper ${zookeeper_version} - - com.101tec - zkclient - ${zkclient_version} - org.apache.curator curator-framework diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml b/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml index 587f1822ceb..6533c09f414 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml +++ b/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml @@ -38,10 +38,6 @@ org.apache.zookeeper zookeeper - - com.101tec - zkclient - org.apache.curator curator-framework diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java index a9400e86ffd..0c61dcc6eac 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/DataListener.java @@ -1,7 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.dubbo.remoting.zookeeper; -import java.util.List; - /** * @author cvictory ON 2019-02-26 */ diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java index 4ffba571c50..dd6f0672521 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.dubbo.remoting.zookeeper; import org.apache.zookeeper.Watcher; From 30f2c3253286b634eb5e5ca1119bfe328917e3bb Mon Sep 17 00:00:00 2001 From: cvictory Date: Thu, 7 Mar 2019 12:57:09 +0800 Subject: [PATCH 8/9] unit test --- .../remoting/zookeeper/curator/CuratorZookeeperClientTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java index 0fb1295cd62..a772de5f4bd 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java @@ -190,7 +190,7 @@ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exc valueFromCache = curatorClient.getContent(path + "/d.json"); Assertions.assertNull(valueFromCache); Thread.sleep(2000l); - Assertions.assertTrue(8l >= atomicInteger.get()); + Assertions.assertTrue(9l >= atomicInteger.get()); Assertions.assertTrue(2l <= atomicInteger.get()); } } From 3ff8235f9382b1464b50e30cf41c2d1475a89918 Mon Sep 17 00:00:00 2001 From: cvictory Date: Tue, 12 Mar 2019 16:00:46 +0800 Subject: [PATCH 9/9] fix #3288. make remote-zookeeper depend on dependencies-zookeeper; fix unit test and issue --- .../dubbo/configcenter/ConfigChangeEvent.java | 10 ++++- .../support/zookeeper/CacheListener.java | 6 ++- .../ZookeeperDynamicConfiguration.java | 6 +++ .../ZookeeperDynamicConfigurationTest.java | 1 + .../dubbo-remoting-zookeeper/pom.xml | 14 ++----- .../dubbo/remoting/zookeeper/EventType.java | 8 +++- .../remoting/zookeeper/ZookeeperClient.java | 2 + .../curator/CuratorZookeeperClient.java | 40 ++++++++++++++++--- .../support/AbstractZookeeperClient.java | 25 +++++++----- .../curator/CuratorZookeeperClientTest.java | 2 +- 10 files changed, 84 insertions(+), 30 deletions(-) diff --git a/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/ConfigChangeEvent.java b/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/ConfigChangeEvent.java index 4a2190a4796..cdedd15e34e 100644 --- a/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/ConfigChangeEvent.java +++ b/dubbo-configcenter/dubbo-configcenter-api/src/main/java/org/apache/dubbo/configcenter/ConfigChangeEvent.java @@ -49,4 +49,12 @@ public ConfigChangeType getChangeType() { return changeType; } -} \ No newline at end of file + @Override + public String toString() { + return "ConfigChangeEvent{" + + "key='" + key + '\'' + + ", value='" + value + '\'' + + ", changeType=" + changeType + + '}'; + } +} diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java index bd653ef6d09..4f6c6382952 100644 --- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java +++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/CacheListener.java @@ -74,11 +74,15 @@ private String pathToKey(String path) { @Override public void dataChanged(String path, Object value, EventType eventType) { if (eventType == null) { + return; + } + + if (eventType == EventType.INITIALIZED) { initializedLatch.countDown(); return; } - if (value == null && eventType != EventType.NodeDeleted) { + if (path == null || (value == null && eventType != EventType.NodeDeleted)) { return; } diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java index 6334463f23d..dac49ccb0e6 100644 --- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java +++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/main/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfiguration.java @@ -59,6 +59,12 @@ public class ZookeeperDynamicConfiguration implements DynamicConfiguration { zkClient = zookeeperTransporter.connect(url); zkClient.addDataListener(rootPath, cacheListener, executor); + try { + // Wait for connection + this.initializedLatch.await(); + } catch (InterruptedException e) { + logger.warn("Failed to build local cache for config center (zookeeper)." + url); + } } /** diff --git a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java index e1ca40f8040..40f9f04a95a 100644 --- a/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java +++ b/dubbo-configcenter/dubbo-configcenter-zookeeper/src/test/java/org/apache/dubbo/configcenter/support/zookeeper/ZookeeperDynamicConfigurationTest.java @@ -133,6 +133,7 @@ public TestListener(CountDownLatch latch) { @Override public void process(ConfigChangeEvent event) { + System.out.println(this + ": " + event); Integer count = countMap.computeIfAbsent(event.getKey(), k -> new Integer(0)); countMap.put(event.getKey(), ++count); diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml b/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml index 6533c09f414..24b14e9bd7e 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml +++ b/dubbo-remoting/dubbo-remoting-zookeeper/pom.xml @@ -35,16 +35,10 @@ ${project.parent.version} - org.apache.zookeeper - zookeeper - - - org.apache.curator - curator-framework - - - org.apache.curator - curator-recipes + org.apache.dubbo + dubbo-dependencies-zookeeper + ${project.parent.version} + pom org.apache.curator diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java index dd6f0672521..a1de0373652 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/EventType.java @@ -26,7 +26,13 @@ public enum EventType { NodeCreated(1), NodeDeleted(2), NodeDataChanged(3), - NodeChildrenChanged(4); + NodeChildrenChanged(4), + CONNECTION_SUSPENDED(11), + CONNECTION_RECONNECTED(12), + CONNECTION_LOST(12), + INITIALIZED(10); + + private final int intValue; // Integer representation of value // for sending over wire diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java index 911c728d168..cbb37479cd3 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/ZookeeperClient.java @@ -44,6 +44,8 @@ public interface ZookeeperClient { */ void addDataListener(String path, DataListener listener, Executor executor); + void removeDataListener(String path, DataListener listener); + void removeChildListener(String path, ChildListener listener); void addStateListener(StateListener listener); diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java index c0b6b84baf0..4bf7b6d3bf5 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClient.java @@ -42,12 +42,15 @@ import java.nio.charset.Charset; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; -public class CuratorZookeeperClient extends AbstractZookeeperClient { +public class CuratorZookeeperClient extends AbstractZookeeperClient { static final Charset charset = Charset.forName("UTF-8"); private final CuratorFramework client; + private Map treeCacheMap = new ConcurrentHashMap<>(); public CuratorZookeeperClient(URL url) { @@ -205,19 +208,20 @@ public List addTargetChildListener(String path, CuratorWatcherImpl liste } @Override - protected TreeCacheListener createTargetDataListener(String path, DataListener listener) { + protected CuratorZookeeperClient.CuratorWatcherImpl createTargetDataListener(String path, DataListener listener) { return new CuratorWatcherImpl(client, listener); } @Override - protected void addTargetDataListener(String path, TreeCacheListener treeCacheListener) { + protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener) { this.addTargetDataListener(path, treeCacheListener, null); } @Override - protected void addTargetDataListener(String path, TreeCacheListener treeCacheListener, Executor executor) { + protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener, Executor executor) { try { TreeCache treeCache = TreeCache.newBuilder(client, path).setCacheData(false).build(); + treeCacheMap.putIfAbsent(path, treeCache); treeCache.start(); if (executor == null) { treeCache.getListenable().addListener(treeCacheListener); @@ -229,6 +233,15 @@ protected void addTargetDataListener(String path, TreeCacheListener treeCacheLis } } + @Override + protected void removeTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener) { + TreeCache treeCache = treeCacheMap.get(path); + if (treeCache != null) { + treeCache.getListenable().removeListener(treeCacheListener); + } + treeCacheListener.dataListener = null; + } + @Override public void removeTargetChildListener(String path, CuratorWatcherImpl listener) { listener.unwatch(); @@ -277,20 +290,37 @@ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exc TreeCacheEvent.Type type = event.getType(); EventType eventType = null; String content = null; + String path = null; switch (type) { case NODE_ADDED: eventType = EventType.NodeCreated; + path = event.getData().getPath(); content = new String(event.getData().getData(), charset); break; case NODE_UPDATED: eventType = EventType.NodeDataChanged; + path = event.getData().getPath(); content = new String(event.getData().getData(), charset); break; case NODE_REMOVED: + path = event.getData().getPath(); eventType = EventType.NodeDeleted; break; + case INITIALIZED: + eventType = EventType.INITIALIZED; + break; + case CONNECTION_LOST: + eventType = EventType.CONNECTION_LOST; + break; + case CONNECTION_RECONNECTED: + eventType = EventType.CONNECTION_RECONNECTED; + break; + case CONNECTION_SUSPENDED: + eventType = EventType.CONNECTION_SUSPENDED; + break; + } - dataListener.dataChanged(event.getData().getPath(), content, eventType); + dataListener.dataChanged(path, content, eventType); } } } diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java index 19fea059666..9697cea012e 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/main/java/org/apache/dubbo/remoting/zookeeper/support/AbstractZookeeperClient.java @@ -103,17 +103,7 @@ public List addChildListener(String path, final ChildListener listener) @Override public void addDataListener(String path, DataListener listener) { - ConcurrentMap dataListenerMap = listeners.get(path); - if (dataListenerMap == null) { - listeners.putIfAbsent(path, new ConcurrentHashMap()); - dataListenerMap = listeners.get(path); - } - TargetDataListener targetListener = dataListenerMap.get(listener); - if (targetListener == null) { - dataListenerMap.putIfAbsent(listener, createTargetDataListener(path, listener)); - targetListener = dataListenerMap.get(listener); - } - addTargetDataListener(path, targetListener); + this.addDataListener(path, listener, null); } @Override @@ -131,6 +121,17 @@ public void addDataListener(String path, DataListener listener, Executor executo addTargetDataListener(path, targetListener, executor); } + @Override + public void removeDataListener(String path, DataListener listener ){ + ConcurrentMap dataListenerMap = listeners.get(path); + if (dataListenerMap != null) { + TargetDataListener targetListener = dataListenerMap.remove(listener); + if(targetListener != null){ + removeTargetDataListener(path, targetListener); + } + } + } + @Override public void removeChildListener(String path, ChildListener listener) { ConcurrentMap listeners = childListeners.get(path); @@ -207,6 +208,8 @@ public String getContent(String path) { protected abstract void addTargetDataListener(String path, TargetDataListener listener, Executor executor); + protected abstract void removeTargetDataListener(String path, TargetDataListener listener); + protected abstract void removeTargetChildListener(String path, TargetChildListener listener); protected abstract String doGetContent(String path); diff --git a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java index a772de5f4bd..f1882e1c759 100644 --- a/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java +++ b/dubbo-remoting/dubbo-remoting-zookeeper/src/test/java/org/apache/dubbo/remoting/zookeeper/curator/CuratorZookeeperClientTest.java @@ -173,7 +173,7 @@ public void testAddTargetDataListener() throws Exception { String valueFromCache = curatorClient.getContent(path + "/d.json"); Assertions.assertEquals(value, valueFromCache); final AtomicInteger atomicInteger = new AtomicInteger(0); - curatorClient.addTargetDataListener(listenerPath, new TreeCacheListener() { + curatorClient.addTargetDataListener(listenerPath, new CuratorZookeeperClient.CuratorWatcherImpl() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { System.out.println("===" + event);