diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java index c55696790a7..e8908e0e933 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java @@ -23,6 +23,7 @@ import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.event.EventDispatcher; import org.apache.dubbo.event.EventListener; +import org.apache.dubbo.registry.NotifyListener; import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent; import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener; @@ -198,16 +199,22 @@ default Map> getInstances(Iterable service /** * Add an instance of {@link ServiceInstancesChangedListener} for specified service *

- * Default, the ServiceInstancesChangedListener will be {@link EventDispatcher#addEventListener(EventListener) added} - * into {@link EventDispatcher} + * Default, Current method will be invoked by {@link ServiceDiscoveryRegistry#subscribe(URL, NotifyListener) + * the ServiceDiscoveryRegistry on the subscription}, and it's mandatory to + * {@link EventDispatcher#addEventListener(EventListener) add} the {@link ServiceInstancesChangedListener} argument + * into {@link EventDispatcher} whether the subclass implements same approach or not, thus this method is used to + * trigger or adapt the vendor's change notification mechanism typically, like Zookeeper Watcher, + * Nacos EventListener. If the registry observes the change, It's suggested that the implementation could invoke + * {@link #dispatchServiceInstancesChangedEvent(String)} method or variants * * @param listener an instance of {@link ServiceInstancesChangedListener} * @throws NullPointerException * @throws IllegalArgumentException + * @see EventPublishingServiceDiscovery + * @see EventDispatcher */ default void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException { - getDefaultExtension().addEventListener(listener); } /** diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java index b4ba20622bd..d30e59b84aa 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java @@ -105,7 +105,7 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry { private final WritableMetadataService writableMetadataService; - private final Set listenedServices = new LinkedHashSet<>(); + private final Set registeredListeners = new LinkedHashSet<>(); public ServiceDiscoveryRegistry(URL registryURL) { super(registryURL); @@ -287,7 +287,7 @@ protected void subscribeURLs(URL url, NotifyListener listener, String serviceNam subscribeURLs(url, listener, serviceName, serviceInstances); // register ServiceInstancesChangedListener - registerServiceInstancesChangedListener(new ServiceInstancesChangedListener(serviceName) { + registerServiceInstancesChangedListener(url, new ServiceInstancesChangedListener(serviceName) { @Override public void onEvent(ServiceInstancesChangedEvent event) { @@ -299,14 +299,20 @@ public void onEvent(ServiceInstancesChangedEvent event) { /** * Register the {@link ServiceInstancesChangedListener} If absent * + * @param url {@link URL} * @param listener the {@link ServiceInstancesChangedListener} */ - private void registerServiceInstancesChangedListener(ServiceInstancesChangedListener listener) { - if (listenedServices.add(listener.getServiceName())) { + private void registerServiceInstancesChangedListener(URL url, ServiceInstancesChangedListener listener) { + String listenerId = createListenerId(url, listener); + if (registeredListeners.add(listenerId)) { serviceDiscovery.addServiceInstancesChangedListener(listener); } } + private String createListenerId(URL url, ServiceInstancesChangedListener listener) { + return listener.getServiceName() + ":" + url.toString(VERSION_KEY, GROUP_KEY, PROTOCOL_KEY); + } + protected void subscribeURLs(URL subscribedURL, NotifyListener listener, String serviceName, Collection serviceInstances) { diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/CompositeMetadataServiceURLBuilder.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/CompositeMetadataServiceURLBuilder.java index 8006c852a78..ef19d8666f7 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/CompositeMetadataServiceURLBuilder.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/CompositeMetadataServiceURLBuilder.java @@ -19,7 +19,6 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.metadata.MetadataService; import org.apache.dubbo.registry.client.ServiceInstance; -import org.apache.dubbo.registry.client.metadata.proxy.MetadataServiceProxy; import java.util.Iterator; import java.util.LinkedList; @@ -34,10 +33,9 @@ /** * The implementation of {@link MetadataServiceURLBuilder} composites the multiple {@link MetadataServiceURLBuilder} * instances are loaded by Java standard {@link ServiceLoader} will aggregate {@link URL URLs} for - * {@link MetadataServiceProxy} + * {@link MetadataService} * * @see MetadataServiceURLBuilder - * @see MetadataServiceProxy * @see MetadataService * @see URL * @see ServiceLoader diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/proxy/MetadataServiceProxy.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/proxy/MetadataServiceProxy.java deleted file mode 100644 index a5b73840f41..00000000000 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/proxy/MetadataServiceProxy.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.registry.client.metadata.proxy; - -import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.logger.Logger; -import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.metadata.MetadataService; -import org.apache.dubbo.registry.client.ServiceInstance; -import org.apache.dubbo.rpc.Invoker; -import org.apache.dubbo.rpc.Protocol; -import org.apache.dubbo.rpc.proxy.InvokerInvocationHandler; - -import java.util.Iterator; -import java.util.List; -import java.util.SortedSet; -import java.util.function.Function; - -import static java.lang.reflect.Proxy.newProxyInstance; -import static org.apache.dubbo.registry.client.metadata.MetadataServiceURLBuilder.composite; - -/** - * The Proxy object for the {@link MetadataService} whose {@link ServiceInstance} providers may export multiple - * {@link Protocol protocols} at the same time. - * - * @see ServiceInstance - * @see MetadataService - * @since 2.7.4 - */ -public class MetadataServiceProxy implements MetadataService { - - private final Logger logger = LoggerFactory.getLogger(getClass()); - - private final List urls; - - private final Protocol protocol; - - public MetadataServiceProxy(ServiceInstance serviceInstance, Protocol protocol) { - this(composite().build(serviceInstance), protocol); - } - - public MetadataServiceProxy(List urls, Protocol protocol) { - this.urls = urls; - this.protocol = protocol; - } - - @Override - public String serviceName() { - return doInMetadataService(MetadataService::serviceName); - } - - @Override - public SortedSet getExportedURLs(String serviceInterface, String group, String version, String protocol) { - return doInMetadataService(metadataService -> - metadataService.getExportedURLs(serviceInterface, group, version, protocol)); - } - - @Override - public String getServiceDefinition(String interfaceName, String version, String group) { - return doInMetadataService(metadataService -> - metadataService.getServiceDefinition(interfaceName, version, group)); - } - - @Override - public String getServiceDefinition(String serviceKey) { - return doInMetadataService(metadataService -> - metadataService.getServiceDefinition(serviceKey)); - } - - protected T doInMetadataService(Function callback) { - - T result = null; // execution result - - Throwable exception = null; // exception maybe present - - Iterator iterator = urls.iterator(); - - while (iterator.hasNext()) { // Executes MetadataService's method until success - URL url = iterator.next(); - Invoker invoker = null; - try { - invoker = this.protocol.refer(MetadataService.class, url); - MetadataService proxy = (MetadataService) newProxyInstance(getClass().getClassLoader(), - new Class[]{MetadataService.class}, new InvokerInvocationHandler(invoker)); - result = callback.apply(proxy); - exception = null; - } catch (Throwable e) { - exception = e; - // If met with some error, invoke next - if (logger.isErrorEnabled()) { - logger.error(e.getMessage(), e); - } - } finally { - if (invoker != null) { - // to destroy the Invoker finally - invoker.destroy(); - invoker = null; - } - } - } - - if (exception != null) { // If all executions were failed - throw new RuntimeException(exception.getMessage(), exception); - } - - return result; - } -}