Skip to content

Commit

Permalink
Polish apache#4713 : Add Service registration and discovery implement…
Browse files Browse the repository at this point in the history
…ation for Eureka
  • Loading branch information
mercyblitz committed Aug 22, 2019
1 parent b992436 commit 072ecc7
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
import org.apache.dubbo.registry.client.metadata.proxy.MetadataServiceProxyFactory;
import org.apache.dubbo.registry.client.selector.ServiceInstanceSelector;
import org.apache.dubbo.registry.support.FailbackRegistry;
Expand Down Expand Up @@ -112,7 +113,13 @@ public ServiceDiscoveryRegistry(URL registryURL) {
this.writableMetadataService = WritableMetadataService.getExtension(metadataStorageType);
}

protected Set<String> getSubscribedServices(URL registryURL) {
/**
* Get the subscribed services from the specified registry {@link URL url}
*
* @param registryURL the specified registry {@link URL url}
* @return non-null
*/
public static Set<String> getSubscribedServices(URL registryURL) {
String subscribedServiceNames = registryURL.getParameter(SUBSCRIBED_SERVICE_NAMES_KEY);
return isBlank(subscribedServiceNames) ? emptySet() :
unmodifiableSet(of(subscribedServiceNames.split(","))
Expand Down Expand Up @@ -307,6 +314,7 @@ private List<URL> getSubscribedURLs(URL subscribedURL, Collection<ServiceInstanc
List<ServiceInstance> serviceInstances = instances.stream()
.filter(ServiceInstance::isEnabled)
.filter(ServiceInstance::isHealthy)
.filter(ServiceInstanceMetadataUtils::isDubboServiceInstance)
.collect(Collectors.toList());

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,26 @@ public static String getMetadataStorageType(ServiceInstance serviceInstance) {
/**
* Set the metadata storage type in specified {@link ServiceInstance service instance}
*
* @param serviceInstance {@link ServiceInstance service instance}
* @param metadataType remote or local
* @param serviceInstance {@link ServiceInstance service instance}
* @param metadataType remote or local
*/
public static void setMetadataStorageType(ServiceInstance serviceInstance, String metadataType) {
Map<String, String> metadata = serviceInstance.getMetadata();
metadata.put(METADATA_STORAGE_TYPE_KEY, metadataType);
}

/**
* Is Dubbo Service instance or not
*
* @param serviceInstance {@link ServiceInstance service instance}
* @return if Dubbo Service instance, return <code>true</code>, or <code>false</code>
*/
public static boolean isDubboServiceInstance(ServiceInstance serviceInstance) {
Map<String, String> metadata = serviceInstance.getMetadata();
return metadata.containsKey(METADATA_SERVICE_URL_PARAMS_KEY)
|| metadata.containsKey(METADATA_SERVICE_URLS_PROPERTY_NAME);
}

private static void setProviderHostParam(Map<String, String> params, URL providerURL) {
params.put(HOST_PARAM_NAME, providerURL.getHost());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@
package org.apache.dubbo.registry.eureka;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.event.EventDispatcher;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;

import com.netflix.appinfo.ApplicationInfoManager;
import com.netflix.appinfo.EurekaInstanceConfig;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.config.ConfigurationManager;
import com.netflix.discovery.CacheRefreshedEvent;
import com.netflix.discovery.DefaultEurekaClientConfig;
import com.netflix.discovery.DiscoveryClient;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.EurekaClientConfig;
import com.netflix.discovery.EurekaEvent;
import com.netflix.discovery.shared.Application;
import com.netflix.discovery.shared.Applications;

Expand All @@ -37,24 +41,37 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;

import static java.util.Collections.emptyList;
import static org.apache.dubbo.event.EventDispatcher.getDefaultExtension;
import static org.apache.dubbo.registry.client.ServiceDiscoveryRegistry.getSubscribedServices;

/**
* Eureka {@link ServiceDiscovery} implementation based on Eureka API
*/
public class EurekaServiceDiscovery implements ServiceDiscovery {

private final EventDispatcher eventDispatcher = getDefaultExtension();

private ApplicationInfoManager applicationInfoManager;

private EurekaClient eurekaClient;

private Set<String> subscribedServices;

/**
* last apps hash code is used to identify the {@link Applications} is changed or not
*/
private String lastAppsHashCode;

@Override
public void initialize(URL registryURL) throws Exception {
Properties eurekaConfigProperties = buildEurekaConfigProperties(registryURL);
initConfigurationManager(eurekaConfigProperties);
initSubscribedServices(registryURL);
}

/**
Expand All @@ -76,6 +93,15 @@ private Properties buildEurekaConfigProperties(URL registryURL) {
return properties;
}

/**
* Initialize {@link #subscribedServices} property
*
* @param registryURL the {@link URL url} to connect Eureka
*/
private void initSubscribedServices(URL registryURL) {
this.subscribedServices = getSubscribedServices(registryURL);
}

private boolean filterEurekaProperty(Map.Entry<String, String> propertyEntry) {
String propertyName = propertyEntry.getKey();
return propertyName.startsWith("eureka.");
Expand Down Expand Up @@ -129,8 +155,44 @@ private void initEurekaClient(ServiceInstance serviceInstance) {
return;
}
initApplicationInfoManager(serviceInstance);
EurekaClient eurekaClient = createEurekaClient();
registerEurekaEventListener(eurekaClient);
// set eurekaClient
this.eurekaClient = eurekaClient;
}

private void registerEurekaEventListener(EurekaClient eurekaClient) {
eurekaClient.registerEventListener(this::onEurekaEvent);
}

private void onEurekaEvent(EurekaEvent event) {
if (event instanceof CacheRefreshedEvent) {
onCacheRefreshedEvent(CacheRefreshedEvent.class.cast(event));
}
}

private void onCacheRefreshedEvent(CacheRefreshedEvent event) {
synchronized (this) { // Make sure thread-safe in async execution
Applications applications = eurekaClient.getApplications();
String appsHashCode = applications.getAppsHashCode();
if (!Objects.equals(lastAppsHashCode, appsHashCode)) { // Changed
// Dispatch Events
dispatchServiceInstancesChangedEvent();
lastAppsHashCode = appsHashCode; // update current result
}
}
}

private void dispatchServiceInstancesChangedEvent() {
subscribedServices.forEach(serviceName -> {
eventDispatcher.dispatch(new ServiceInstancesChangedEvent(serviceName, getInstances(serviceName)));
});
}

private EurekaClient createEurekaClient() {
EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
this.eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
DiscoveryClient eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
return eurekaClient;
}

@Override
Expand Down

0 comments on commit 072ecc7

Please sign in to comment.