diff --git a/dubbo-all/pom.xml b/dubbo-all/pom.xml index 730b73cf5d3..f38201f8e30 100644 --- a/dubbo-all/pom.xml +++ b/dubbo-all/pom.xml @@ -234,6 +234,13 @@ compile true + + org.apache.dubbo + dubbo-registry-consul + ${project.version} + compile + true + org.apache.dubbo dubbo-monitor-api @@ -346,6 +353,13 @@ compile true + + org.apache.dubbo + dubbo-configcenter-consul + ${project.version} + compile + true + org.apache.dubbo dubbo-compatible @@ -381,6 +395,13 @@ compile true + + org.apache.dubbo + dubbo-metadata-report-consul + ${project.version} + compile + true + @@ -468,6 +489,7 @@ org.apache.dubbo:dubbo-registry-multicast org.apache.dubbo:dubbo-registry-zookeeper org.apache.dubbo:dubbo-registry-redis + org.apache.dubbo:dubbo-registry-consul org.apache.dubbo:dubbo-monitor-api org.apache.dubbo:dubbo-monitor-default org.apache.dubbo:dubbo-config-api @@ -488,10 +510,12 @@ org.apache.dubbo:dubbo-configcenter-definition org.apache.dubbo:dubbo-configcenter-apollo org.apache.dubbo:dubbo-configcenter-zookeeper + org.apache.dubbo:dubbo-configcenter-consul org.apache.dubbo:dubbo-metadata-report-api org.apache.dubbo:dubbo-metadata-definition org.apache.dubbo:dubbo-metadata-report-redis org.apache.dubbo:dubbo-metadata-report-zookeeper + org.apache.dubbo:dubbo-metadata-report-consul diff --git a/dubbo-bom/pom.xml b/dubbo-bom/pom.xml index c0997313247..73e73dfb681 100644 --- a/dubbo-bom/pom.xml +++ b/dubbo-bom/pom.xml @@ -218,6 +218,11 @@ dubbo-registry-redis ${project.version} + + org.apache.dubbo + dubbo-registry-consul + ${project.version} + org.apache.dubbo dubbo-monitor-api @@ -303,6 +308,11 @@ dubbo-metadata-report-redis ${project.version} + + org.apache.dubbo + dubbo-metadata-report-consul + ${project.version} + org.apache.dubbo dubbo-configcenter-api @@ -318,6 +328,11 @@ dubbo-configcenter-apollo ${project.version} + + org.apache.dubbo + dubbo-configcenter-consul + ${project.version} + org.apache.dubbo dubbo-metadata-definition diff --git a/dubbo-configcenter/dubbo-configcenter-consul/pom.xml b/dubbo-configcenter/dubbo-configcenter-consul/pom.xml new file mode 100644 index 00000000000..2a7580e837e --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-consul/pom.xml @@ -0,0 +1,44 @@ + + + + + + dubbo-configcenter + org.apache.dubbo + 2.7.1-SNAPSHOT + + 4.0.0 + + dubbo-configcenter-consul + + + + org.apache.dubbo + dubbo-configcenter-api + ${project.parent.version} + + + com.ecwid.consul + consul-api + + + + + diff --git a/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java new file mode 100644 index 00000000000..5ef1985e96f --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.configcenter.consul; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.configcenter.ConfigChangeEvent; +import org.apache.dubbo.configcenter.ConfigurationListener; +import org.apache.dubbo.configcenter.DynamicConfiguration; + +import com.ecwid.consul.v1.ConsulClient; +import com.ecwid.consul.v1.QueryParams; +import com.ecwid.consul.v1.Response; +import com.ecwid.consul.v1.kv.model.GetValue; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; + +import static java.util.concurrent.Executors.newCachedThreadPool; +import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY; +import static org.apache.dubbo.common.Constants.PATH_SEPARATOR; + +/** + * config center implementation for consul + */ +public class ConsulDynamicConfiguration implements DynamicConfiguration { + private static final Logger logger = LoggerFactory.getLogger(ConsulDynamicConfiguration.class); + + private static final int DEFAULT_PORT = 8500; + private static final int DEFAULT_WATCH_TIMEOUT = 60 * 1000; + private static final String WATCH_TIMEOUT = "consul-watch-timeout"; + + private URL url; + private String rootPath; + private ConsulClient client; + private int watchTimeout = -1; + private ConcurrentMap watchers = new ConcurrentHashMap<>(); + private ConcurrentMap consulIndexes = new ConcurrentHashMap<>(); + private ExecutorService watcherService = newCachedThreadPool( + new NamedThreadFactory("dubbo-consul-configuration-watcher", true)); + + public ConsulDynamicConfiguration(URL url) { + this.url = url; + this.rootPath = PATH_SEPARATOR + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + PATH_SEPARATOR + "config"; + this.watchTimeout = buildWatchTimeout(url); + String host = url.getHost(); + int port = url.getPort() != 0 ? url.getPort() : DEFAULT_PORT; + client = new ConsulClient(host, port); + } + + @Override + public void addListener(String key, String group, ConfigurationListener listener) { + logger.info("register listener " + listener.getClass() + " for config with key: " + key + ", group: " + group); + ConsulKVWatcher watcher = watchers.putIfAbsent(key, new ConsulKVWatcher(key)); + if (watcher == null) { + watcher = watchers.get(key); + watcherService.submit(watcher); + } + watcher.addListener(listener); + } + + @Override + public void removeListener(String key, String group, ConfigurationListener listener) { + logger.info("unregister listener " + listener.getClass() + " for config with key: " + key + ", group: " + group); + ConsulKVWatcher watcher = watchers.get(key); + if (watcher != null) { + watcher.removeListener(listener); + } + } + + @Override + public String getConfig(String key, String group, long timeout) throws IllegalStateException { + if (StringUtils.isNotEmpty(group)) { + key = group + PATH_SEPARATOR + key; + } else { + int i = key.lastIndexOf("."); + key = key.substring(0, i) + PATH_SEPARATOR + key.substring(i + 1); + } + + return (String) getInternalProperty(rootPath + PATH_SEPARATOR + key); + } + + @Override + public Object getInternalProperty(String key) { + logger.info("get config from: " + key); + Long currentIndex = consulIndexes.computeIfAbsent(key, k -> -1L); + Response response = client.getKVValue(key, new QueryParams(watchTimeout, currentIndex)); + GetValue value = response.getValue(); + consulIndexes.put(key, response.getConsulIndex()); + return value != null ? value.getDecodedValue() : null; + } + + private int buildWatchTimeout(URL url) { + return url.getParameter(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000; + } + + private class ConsulKVWatcher implements Runnable { + private String key; + private Set listeners; + private boolean running = true; + + public ConsulKVWatcher(String key) { + this.key = convertKey(key); + this.listeners = new HashSet<>(); + } + + @Override + public void run() { + while (running) { + Long lastIndex = consulIndexes.computeIfAbsent(key, k -> -1L); + Response response = client.getKVValue(key, new QueryParams(watchTimeout, lastIndex)); + + Long currentIndex = response.getConsulIndex(); + if (currentIndex == null || currentIndex <= lastIndex) { + continue; + } + + consulIndexes.put(key, currentIndex); + String value = response.getValue().getDecodedValue(); + logger.info("notify change for key: " + key + ", the value is: " + value); + ConfigChangeEvent event = new ConfigChangeEvent(key, value); + for (ConfigurationListener listener : listeners) { + listener.process(event); + } + } + } + + private void addListener(ConfigurationListener listener) { + this.listeners.add(listener); + } + + private void removeListener(ConfigurationListener listener) { + this.listeners.remove(listener); + } + + private String convertKey(String key) { + int index = key.lastIndexOf('.'); + return rootPath + PATH_SEPARATOR + key.substring(0, index) + PATH_SEPARATOR + key.substring(index + 1); + } + + private void stop() { + running = false; + } + } +} diff --git a/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfigurationFactory.java b/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfigurationFactory.java new file mode 100644 index 00000000000..813b6174662 --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfigurationFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.configcenter.consul; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.configcenter.AbstractDynamicConfigurationFactory; +import org.apache.dubbo.configcenter.DynamicConfiguration; + +/** + * Config center factory for consul + */ +public class ConsulDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory { + @Override + protected DynamicConfiguration createDynamicConfiguration(URL url) { + return new ConsulDynamicConfiguration(url); + } +} diff --git a/dubbo-configcenter/dubbo-configcenter-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory b/dubbo-configcenter/dubbo-configcenter-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory new file mode 100644 index 00000000000..b7a5091efa3 --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory @@ -0,0 +1 @@ +consul=org.apache.dubbo.configcenter.consul.ConsulDynamicConfigurationFactory diff --git a/dubbo-configcenter/pom.xml b/dubbo-configcenter/pom.xml index 6512dc4fd57..fa703be6864 100644 --- a/dubbo-configcenter/pom.xml +++ b/dubbo-configcenter/pom.xml @@ -33,5 +33,6 @@ dubbo-configcenter-api dubbo-configcenter-zookeeper dubbo-configcenter-apollo + dubbo-configcenter-consul - \ No newline at end of file + diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml index 2b1666b80d6..c9fa9636592 100644 --- a/dubbo-dependencies-bom/pom.xml +++ b/dubbo-dependencies-bom/pom.xml @@ -105,6 +105,7 @@ 4.0.1 2.12.0 2.9.0 + 1.4.2 1.3.6 3.1.15 0.8.0 @@ -232,6 +233,11 @@ jedis ${jedis_version} + + com.ecwid.consul + consul-api + ${consul_version} + com.googlecode.xmemcached xmemcached diff --git a/dubbo-metadata-report/dubbo-metadata-report-consul/pom.xml b/dubbo-metadata-report/dubbo-metadata-report-consul/pom.xml new file mode 100644 index 00000000000..ad02eaedda6 --- /dev/null +++ b/dubbo-metadata-report/dubbo-metadata-report-consul/pom.xml @@ -0,0 +1,43 @@ + + + + + + dubbo-metadata-report + org.apache.dubbo + 2.7.1-SNAPSHOT + + 4.0.0 + + dubbo-metadata-report-consul + + + + org.apache.dubbo + dubbo-metadata-report-api + ${project.parent.version} + + + com.ecwid.consul + consul-api + + + + diff --git a/dubbo-metadata-report/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReport.java b/dubbo-metadata-report/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReport.java new file mode 100644 index 00000000000..6e24fd9dab3 --- /dev/null +++ b/dubbo-metadata-report/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReport.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.metadata.store.consul; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.metadata.identifier.MetadataIdentifier; +import org.apache.dubbo.metadata.support.AbstractMetadataReport; +import org.apache.dubbo.rpc.RpcException; + +import com.ecwid.consul.v1.ConsulClient; + +/** + * metadata report impl for consul + */ +public class ConsulMetadataReport extends AbstractMetadataReport { + private static final Logger logger = LoggerFactory.getLogger(ConsulMetadataReport.class); + private static final int DEFAULT_PORT = 8500; + + private ConsulClient client; + + public ConsulMetadataReport(URL url) { + super(url); + + String host = url.getHost(); + int port = url.getPort() != 0 ? url.getPort() : DEFAULT_PORT; + client = new ConsulClient(host, port); + } + + @Override + protected void doStoreProviderMetadata(MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions) { + this.storeMetadata(providerMetadataIdentifier, serviceDefinitions); + } + + @Override + protected void doStoreConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, String value) { + this.storeMetadata(consumerMetadataIdentifier, value); + } + + private void storeMetadata(MetadataIdentifier identifier, String v) { + try { + client.setKVValue(identifier.getIdentifierKey() + META_DATA_SOTRE_TAG, v); + } catch (Throwable t) { + logger.error("Failed to put " + identifier + " to consul " + v + ", cause: " + t.getMessage(), t); + throw new RpcException("Failed to put " + identifier + " to consul " + v + ", cause: " + t.getMessage(), t); + } + } +} diff --git a/dubbo-metadata-report/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReportFactory.java b/dubbo-metadata-report/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReportFactory.java new file mode 100644 index 00000000000..66d7b5e5e45 --- /dev/null +++ b/dubbo-metadata-report/dubbo-metadata-report-consul/src/main/java/org/apache/dubbo/metadata/store/consul/ConsulMetadataReportFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.metadata.store.consul; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.metadata.store.MetadataReport; +import org.apache.dubbo.metadata.support.AbstractMetadataReportFactory; + +/** + * metadata report factory impl for consul + */ +public class ConsulMetadataReportFactory extends AbstractMetadataReportFactory { + @Override + protected MetadataReport createMetadataReport(URL url) { + return new ConsulMetadataReport(url); + } +} diff --git a/dubbo-metadata-report/dubbo-metadata-report-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.store.MetadataReportFactory b/dubbo-metadata-report/dubbo-metadata-report-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.store.MetadataReportFactory new file mode 100644 index 00000000000..1f27535d442 --- /dev/null +++ b/dubbo-metadata-report/dubbo-metadata-report-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.store.MetadataReportFactory @@ -0,0 +1 @@ +consul=org.apache.dubbo.metadata.store.consul.ConsulMetadataReportFactory diff --git a/dubbo-metadata-report/pom.xml b/dubbo-metadata-report/pom.xml index 195ae5e9299..aa14d56122e 100644 --- a/dubbo-metadata-report/pom.xml +++ b/dubbo-metadata-report/pom.xml @@ -29,6 +29,7 @@ dubbo-metadata-report-zookeeper dubbo-metadata-report-redis dubbo-metadata-definition + dubbo-metadata-report-consul diff --git a/dubbo-registry/dubbo-registry-consul/pom.xml b/dubbo-registry/dubbo-registry-consul/pom.xml new file mode 100644 index 00000000000..38647f259ac --- /dev/null +++ b/dubbo-registry/dubbo-registry-consul/pom.xml @@ -0,0 +1,43 @@ + + + + + + dubbo-registry + org.apache.dubbo + 2.7.1-SNAPSHOT + + 4.0.0 + + dubbo-registry-consul + + + + org.apache.dubbo + dubbo-registry-api + ${project.parent.version} + + + com.ecwid.consul + consul-api + + + + diff --git a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java new file mode 100644 index 00000000000..72f7ff43b86 --- /dev/null +++ b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistry.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.registry.consul; + +import org.apache.dubbo.common.Constants; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.registry.NotifyListener; +import org.apache.dubbo.registry.support.FailbackRegistry; + +import com.ecwid.consul.v1.ConsulClient; +import com.ecwid.consul.v1.QueryParams; +import com.ecwid.consul.v1.Response; +import com.ecwid.consul.v1.agent.model.NewService; +import com.ecwid.consul.v1.catalog.CatalogServicesRequest; +import com.ecwid.consul.v1.health.HealthServicesRequest; +import com.ecwid.consul.v1.health.model.HealthService; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +import static java.util.concurrent.Executors.newCachedThreadPool; +import static org.apache.dubbo.common.Constants.ANY_VALUE; + +/** + * registry center implementation for consul + */ +public class ConsulRegistry extends FailbackRegistry { + private static final Logger logger = LoggerFactory.getLogger(ConsulRegistry.class); + + private static final String SERVICE_TAG = "dubbo"; + private static final String URL_META_KEY = "url"; + private static final String WATCH_TIMEOUT = "consul-watch-timeout"; + private static final String CHECK_INTERVAL = "consul-check-interval"; + private static final String CHECK_TIMEOUT = "consul-check-timeout"; + private static final String DEREGISTER_AFTER = "consul-deregister-critical-service-after"; + + private static final int DEFAULT_PORT = 8500; + // default watch timeout in millisecond + private static final int DEFAULT_WATCH_TIMEOUT = 60 * 1000; + // default tcp check interval + private static final String DEFAULT_CHECK_INTERVAL = "10s"; + // default tcp check timeout + private static final String DEFAULT_CHECK_TIMEOUT = "1s"; + // default deregister critical server after + private static final String DEFAULT_DEREGISTER_TIME = "20s"; + + private ConsulClient client; + + private ExecutorService notifierExecutor = newCachedThreadPool( + new NamedThreadFactory("dubbo-consul-notifier", true)); + private ConcurrentMap notifiers = new ConcurrentHashMap<>(); + + public ConsulRegistry(URL url) { + super(url); + String host = url.getHost(); + int port = url.getPort() != 0 ? url.getPort() : DEFAULT_PORT; + client = new ConsulClient(host, port); + } + + @Override + public void register(URL url) { + if (isConsumerSide(url)) { + return; + } + + super.register(url); + } + + @Override + public void doRegister(URL url) { + client.agentServiceRegister(buildService(url)); + } + + @Override + public void unregister(URL url) { + if (isConsumerSide(url)) { + return; + } + + super.unregister(url); + } + + @Override + public void doUnregister(URL url) { + client.agentServiceDeregister(buildId(url)); + } + + @Override + public void subscribe(URL url, NotifyListener listener) { + if (isProviderSide(url)) { + return; + } + + super.subscribe(url, listener); + } + + @Override + public void doSubscribe(URL url, NotifyListener listener) { + Long index; + List urls; + if (ANY_VALUE.equals(url.getServiceInterface())) { + Response>> response = getAllServices(-1, buildWatchTimeout(url)); + index = response.getConsulIndex(); + List services = getHealthServices(response.getValue()); + urls = convert(services); + } else { + String service = url.getServiceKey(); + Response> response = getHealthServices(service, -1, buildWatchTimeout(url)); + index = response.getConsulIndex(); + urls = convert(response.getValue()); + } + + notify(url, listener, urls); + ConsulNotifier notifier = notifiers.computeIfAbsent(url, k -> new ConsulNotifier(url, index)); + notifierExecutor.submit(notifier); + } + + @Override + public void unsubscribe(URL url, NotifyListener listener) { + if (isProviderSide(url)) { + return; + } + + super.unsubscribe(url, listener); + } + + @Override + public void doUnsubscribe(URL url, NotifyListener listener) { + ConsulNotifier notifier = notifiers.remove(url); + notifier.stop(); + } + + @Override + public boolean isAvailable() { + return client.getAgentSelf() != null; + } + + @Override + public void destroy() { + super.destroy(); + notifierExecutor.shutdown(); + } + + private Response> getHealthServices(String service, long index, int watchTimeout) { + HealthServicesRequest request = HealthServicesRequest.newBuilder() + .setTag(SERVICE_TAG) + .setQueryParams(new QueryParams(watchTimeout, index)) + .setPassing(true) + .build(); + return client.getHealthServices(service, request); + } + + private Response>> getAllServices(long index, int watchTimeout) { + CatalogServicesRequest request = CatalogServicesRequest.newBuilder() + .setQueryParams(new QueryParams(watchTimeout, index)) + .build(); + return client.getCatalogServices(request); + } + + private List getHealthServices(Map> services) { + return services.keySet().stream() + .filter(s -> services.get(s).contains(SERVICE_TAG)) + .map(s -> getHealthServices(s, -1, -1).getValue()) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } + + + private boolean isConsumerSide(URL url) { + return url.getProtocol().equals(Constants.CONSUMER_PROTOCOL); + } + + private boolean isProviderSide(URL url) { + return url.getProtocol().equals(Constants.PROVIDER_PROTOCOL); + } + + private List convert(List services) { + return services.stream() + .map(s -> s.getService().getMeta().get(URL_META_KEY)) + .map(URL::valueOf) + .collect(Collectors.toList()); + } + + private NewService buildService(URL url) { + NewService service = new NewService(); + service.setAddress(url.getHost()); + service.setPort(url.getPort()); + service.setId(buildId(url)); + service.setName(url.getServiceInterface()); + service.setCheck(buildCheck(url)); + service.setTags(buildTags(url)); + service.setMeta(Collections.singletonMap(URL_META_KEY, url.toFullString())); + return service; + } + + private List buildTags(URL url) { + Map params = url.getParameters(); + List tags = params.keySet().stream() + .map(k -> k + "=" + params.get(k)) + .collect(Collectors.toList()); + tags.add(SERVICE_TAG); + return tags; + } + + private String buildId(URL url) { + // let's simply use url's hashcode to generate unique service id for now + return Integer.toHexString(url.hashCode()); + } + + private NewService.Check buildCheck(URL url) { + NewService.Check check = new NewService.Check(); + check.setTcp(url.getAddress()); + check.setInterval(url.getParameter(CHECK_INTERVAL, DEFAULT_CHECK_INTERVAL)); + check.setTimeout(url.getParameter(CHECK_TIMEOUT, DEFAULT_CHECK_TIMEOUT)); + check.setDeregisterCriticalServiceAfter(url.getParameter(DEREGISTER_AFTER, DEFAULT_DEREGISTER_TIME)); + return check; + } + + private int buildWatchTimeout(URL url) { + return url.getParameter(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000; + } + + private class ConsulNotifier implements Runnable { + private URL url; + private long consulIndex; + private boolean running; + + ConsulNotifier(URL url, long consulIndex) { + this.url = url; + this.consulIndex = consulIndex; + this.running = true; + } + + @Override + public void run() { + while (this.running) { + if (ANY_VALUE.equals(url.getServiceInterface())) { + processServices(); + } else { + processService(); + } + } + } + + private void processService() { + String service = url.getServiceKey(); + Response> response = getHealthServices(service, consulIndex, buildWatchTimeout(url)); + Long currentIndex = response.getConsulIndex(); + if (currentIndex != null && currentIndex > consulIndex) { + consulIndex = currentIndex; + List services = response.getValue(); + List urls = convert(services); + for (NotifyListener listener : getSubscribed().get(url)) { + doNotify(url, listener, urls); + } + } + } + + private void processServices() { + Response>> response = getAllServices(consulIndex, buildWatchTimeout(url)); + Long currentIndex = response.getConsulIndex(); + if (currentIndex != null && currentIndex > consulIndex) { + consulIndex = currentIndex; + List services = getHealthServices(response.getValue()); + List urls = convert(services); + for (NotifyListener listener : getSubscribed().get(url)) { + doNotify(url, listener, urls); + } + } + } + + void stop() { + this.running = false; + } + } +} diff --git a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistryFactory.java b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistryFactory.java new file mode 100644 index 00000000000..c36f009c0d0 --- /dev/null +++ b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulRegistryFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.registry.consul; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.registry.Registry; +import org.apache.dubbo.registry.support.AbstractRegistryFactory; + +/** + * registry center factory implementation for consul + */ +public class ConsulRegistryFactory extends AbstractRegistryFactory { + @Override + protected Registry createRegistry(URL url) { + return new ConsulRegistry(url); + } +} diff --git a/dubbo-registry/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory b/dubbo-registry/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory new file mode 100644 index 00000000000..7aea18f4d8f --- /dev/null +++ b/dubbo-registry/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory @@ -0,0 +1 @@ +consul=org.apache.dubbo.registry.consul.ConsulRegistryFactory diff --git a/dubbo-registry/pom.xml b/dubbo-registry/pom.xml index 93f64bfe8a4..f74cca0efdd 100644 --- a/dubbo-registry/pom.xml +++ b/dubbo-registry/pom.xml @@ -34,5 +34,6 @@ dubbo-registry-multicast dubbo-registry-zookeeper dubbo-registry-redis + dubbo-registry-consul diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 7678977a47a..77443016cfb 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -152,8 +152,8 @@ public void connected(Channel channel) throws RemotingException { @Override public void disconnected(Channel channel) throws RemotingException { - if (logger.isInfoEnabled()) { - logger.info("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); + if (logger.isDebugEnabled()) { + logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); } invoke(channel, Constants.ON_DISCONNECT_KEY); }