Skip to content

Commit

Permalink
Support multi registries metrics key (#12582)
Browse files Browse the repository at this point in the history
* init

* fix

* fix

* fix

* fix

* add licence

* fix testcase

* fix testcase

* lowcase

* remove unuse

* opt notify&directory

* fix post&finish

* revert name

* move registerMetadataAndInstance

* move metrics from

* remove some unuse

* remove unuse

* fix style

* fix style

* remove unuse

* remove unuse

* fix default&& move post

* add multi registry lables for subscribe

* fix

* fix url

* fix url

* Fix uts

* add registry key for directory

* Compatible with empty registry keys

* remove unus

* Support registry type

* Fix attachment

* Code enhancement

* Fix registry key

* Fix service discovery notify

* fix

---------

Co-authored-by: songxiaosheng <songxiaosheng@elastic.link>
Co-authored-by: Albumen Kevin <jhq0812@gmail.com>
  • Loading branch information
3 people authored Jul 31, 2023
1 parent 1087070 commit 0add70e
Show file tree
Hide file tree
Showing 34 changed files with 594 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public void checkConnectivity() {
}
}, reconnectTaskPeriod, TimeUnit.MILLISECONDS);
}
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary(), getDirectoryMeta()));
}

/**
Expand All @@ -366,7 +366,11 @@ public void refreshInvoker() {
if (invokersInitialized) {
refreshInvokerInternal();
}
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary(), getDirectoryMeta()));
}

protected Map<String, String> getDirectoryMeta() {
return Collections.emptyMap();
}

private synchronized void refreshInvokerInternal() {
Expand Down Expand Up @@ -395,7 +399,7 @@ public void addDisabledInvoker(Invoker<T> invoker) {
removeValidInvoker(invoker);
logger.info("Disable service address: " + invoker.getUrl() + ".");
}
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary(), getDirectoryMeta()));
}

@Override
Expand All @@ -408,7 +412,7 @@ public void recoverDisabledInvoker(Invoker<T> invoker) {

}
}
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary(), getDirectoryMeta()));
}

protected final void refreshRouter(BitList<Invoker<T>> newlyInvokers, Runnable switchAction) {
Expand Down Expand Up @@ -465,7 +469,7 @@ protected void setInvokers(BitList<Invoker<T>> invokers) {
refreshInvokerInternal();
this.invokersInitialized = true;

MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary(), getDirectoryMeta()));
}

protected void destroyInvokers() {
Expand All @@ -480,7 +484,7 @@ private boolean addValidInvoker(Invoker<T> invoker) {
synchronized (this.validInvokers) {
result = this.validInvokers.add(invoker);
}
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary(), getDirectoryMeta()));
return result;
}

Expand All @@ -489,7 +493,7 @@ private boolean removeValidInvoker(Invoker<T> invoker) {
synchronized (this.validInvokers) {
result = this.validInvokers.remove(invoker);
}
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary(), getDirectoryMeta()));
return result;
}

Expand Down Expand Up @@ -519,18 +523,7 @@ private Map<MetricsKey, Map<String, Integer>> getSummary() {
}

private Map<String, Integer> groupByServiceKey(Collection<Invoker<T>> invokers) {

Map<String, Integer> serviceNumMap = new HashMap<>();
for (Invoker<T> invoker : invokers) {
if (invoker.getClass().getSimpleName().contains("Mockito")) {
return serviceNumMap;
}
}
if (invokers.size() > 0) {
serviceNumMap = invokers.stream().filter(invoker -> invoker.getInterface() != null).collect(Collectors.groupingBy(invoker -> invoker.getInterface().getName(), Collectors.reducing(0, e -> 1, Integer::sum)));
}

return serviceNumMap;
return Collections.singletonMap(getConsumerUrl().getServiceKey(), invokers.size());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
import org.apache.dubbo.rpc.cluster.SingleRouterChain;
import org.apache.dubbo.rpc.cluster.router.state.BitList;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_FAILED_SITE_SELECTION;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;

/**
* StaticDirectory
Expand Down Expand Up @@ -125,4 +128,8 @@ protected List<Invoker<T>> doList(SingleRouterChain<T> singleRouterChain, BitLis
return invokers;
}

@Override
protected Map<String, String> getDirectoryMeta() {
return Collections.singletonMap(REGISTRY_KEY, "static");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ public interface CommonConstants {

String INTERFACE_REGISTER_MODE = "interface";

String INSTANCE_REGISTER_MODE = "instance";

String DEFAULT_REGISTER_MODE = "all";

String GENERIC_KEY = "generic";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import org.apache.dubbo.config.support.Parameter;
import org.apache.dubbo.config.utils.ConfigValidationUtils;
import org.apache.dubbo.metadata.ServiceNameMapping;
import org.apache.dubbo.metrics.event.MetricsEventBus;
import org.apache.dubbo.metrics.registry.event.RegistryEvent;
import org.apache.dubbo.registry.client.metadata.MetadataUtils;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
Expand Down Expand Up @@ -516,21 +514,17 @@ private void doExportUrls(RegisterTypeEnum registerType) {

List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);

MetricsEventBus.post(RegistryEvent.toRsEvent(getApplicationModel(), getUniqueServiceName(), protocols.size() * registryURLs.size()), () -> {
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// stub service will use generated service name
if (!serverService) {
// In case user specified path, registerImmediately service one more time to map it to path.
repository.registerService(pathKey, interfaceClass);
}
doExportUrlsFor1Protocol(protocolConfig, registryURLs, registerType);
}
return null;
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// stub service will use generated service name
if (!serverService) {
// In case user specified path, register service one more time to map it to path.
repository.registerService(pathKey, interfaceClass);
}
);
doExportUrlsFor1Protocol(protocolConfig, registryURLs, registerType);
}

providerModel.setServiceUrls(urls);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.dubbo.metrics.collector.DefaultMetricsCollector;
import org.apache.dubbo.metrics.config.event.ConfigCenterEvent;
import org.apache.dubbo.metrics.event.MetricsEventBus;
import org.apache.dubbo.metrics.registry.event.RegistryEvent;
import org.apache.dubbo.metrics.report.DefaultMetricsReporterFactory;
import org.apache.dubbo.metrics.report.MetricsReporter;
import org.apache.dubbo.metrics.report.MetricsReporterFactory;
Expand Down Expand Up @@ -899,16 +898,10 @@ private DynamicConfiguration getDynamicConfiguration(URL connectionURL) {
private void registerServiceInstance() {
try {
registered = true;
MetricsEventBus.post(RegistryEvent.toRegisterEvent(applicationModel),
() -> {
ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel);
return null;
}
);
ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel);
} catch (Exception e) {
logger.error(CONFIG_REGISTER_INSTANCE_ERROR, "configuration server disconnected", "", "Register instance error.", e);
}

if (registered) {
// scheduled task for updating Metadata and ServiceInstance
asyncMetadataFuture = frameworkExecutorRepository.getSharedScheduledExecutor().scheduleWithFixedDelay(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

public abstract class CombMetricsCollector<E extends TimeCounterEvent> extends AbstractMetricsListener<E> implements ApplicationMetricsCollector<E>, ServiceMetricsCollector<E>, MethodMetricsCollector<E> {

private final BaseStatComposite stats;
protected final BaseStatComposite stats;
private MetricsEventMulticaster eventMulticaster;


Expand Down Expand Up @@ -108,5 +108,9 @@ public void onEventFinish(TimeCounterEvent event) {
public void onEventError(TimeCounterEvent event) {
eventMulticaster.publishErrorEvent(event);
}

protected BaseStatComposite getStats() {
return stats;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.dubbo.metrics.data;

import org.apache.dubbo.metrics.collector.MetricsCollector;
import org.apache.dubbo.metrics.model.ApplicationMetric;
import org.apache.dubbo.metrics.model.MethodMetric;
import org.apache.dubbo.metrics.model.MetricsCategory;
import org.apache.dubbo.metrics.model.ServiceKeyMetric;
import org.apache.dubbo.metrics.model.key.MetricsKey;
import org.apache.dubbo.metrics.model.key.MetricsKeyWrapper;
import org.apache.dubbo.metrics.model.sample.MetricSample;
Expand All @@ -29,6 +31,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;


/**
Expand Down Expand Up @@ -70,11 +73,11 @@ protected void init(RtStatComposite rtStatComposite) {
}

public void calcApplicationRt(String registryOpType, Long responseTime) {
rtStatComposite.calcApplicationRt(registryOpType, responseTime);
rtStatComposite.calcServiceKeyRt(registryOpType, responseTime, new ApplicationMetric(rtStatComposite.getApplicationModel()));
}

public void calcServiceKeyRt(String serviceKey, String registryOpType, Long responseTime) {
rtStatComposite.calcServiceKeyRt(serviceKey, registryOpType, responseTime);
rtStatComposite.calcServiceKeyRt(registryOpType, responseTime, new ServiceKeyMetric(rtStatComposite.getApplicationModel(), serviceKey));
}

public void calcServiceKeyRt(Invocation invocation, String registryOpType, Long responseTime) {
Expand All @@ -89,6 +92,10 @@ public void setServiceKey(MetricsKeyWrapper metricsKey, String serviceKey, int n
serviceStatComposite.setServiceKey(metricsKey, serviceKey, num);
}

public void setServiceKey(MetricsKeyWrapper metricsKey, String serviceKey, int num, Map<String, String> extra) {
serviceStatComposite.setExtraServiceKey(metricsKey, serviceKey, num, extra);
}

public void incrementApp(MetricsKey metricsKey, int size) {
applicationStatComposite.incrementSize(metricsKey, size);
}
Expand All @@ -97,6 +104,10 @@ public void incrementServiceKey(MetricsKeyWrapper metricsKeyWrapper, String attS
serviceStatComposite.incrementServiceKey(metricsKeyWrapper, attServiceKey, size);
}

public void incrementServiceKey(MetricsKeyWrapper metricsKeyWrapper, String attServiceKey, Map<String, String> extra, int size) {
serviceStatComposite.incrementExtraServiceKey(metricsKeyWrapper, attServiceKey, extra, size);
}

public void incrementMethodKey(MetricsKeyWrapper metricsKeyWrapper, MethodMetric methodMetric, int size) {
methodStatComposite.incrementMethodKey(metricsKeyWrapper, methodMetric, size);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.dubbo.metrics.data;

import org.apache.dubbo.metrics.model.ApplicationMetric;
import org.apache.dubbo.metrics.model.MethodMetric;
import org.apache.dubbo.metrics.model.Metric;
import org.apache.dubbo.metrics.model.MetricsCategory;
Expand Down Expand Up @@ -89,20 +88,7 @@ private List<LongContainer<? extends Number>> initStats(MetricsPlaceValue placeV
return singleRtStats;
}

public void calcApplicationRt(String registryOpType, Long responseTime) {
ApplicationMetric key = new ApplicationMetric(getApplicationModel());
for (LongContainer container : rtStats.get(registryOpType)) {
Number current = (Number) container.get(key);
if (current == null) {
container.putIfAbsent(key, container.getInitFunc().apply(key));
current = (Number) container.get(key);
}
container.getConsumerFunc().accept(responseTime, current);
}
}

public void calcServiceKeyRt(String serviceKey, String registryOpType, Long responseTime) {
ServiceKeyMetric key = new ServiceKeyMetric(getApplicationModel(), serviceKey);
public void calcServiceKeyRt(String registryOpType, Long responseTime, Metric key) {
for (LongContainer container : rtStats.get(registryOpType)) {
Number current = (Number) container.get(key);
if (current == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,37 @@ public void initWrapper(List<MetricsKeyWrapper> metricsKeyWrappers) {
}

public void incrementServiceKey(MetricsKeyWrapper wrapper, String serviceKey, int size) {
incrementExtraServiceKey(wrapper, serviceKey, null, size);
}

public void incrementExtraServiceKey(MetricsKeyWrapper wrapper, String serviceKey, Map<String, String> extra, int size) {
if (!serviceWrapperNumStats.containsKey(wrapper)) {
return;
}
serviceWrapperNumStats.get(wrapper).computeIfAbsent(new ServiceKeyMetric(getApplicationModel(), serviceKey), k -> new AtomicLong(0L)).getAndAdd(size);
ServiceKeyMetric serviceKeyMetric = new ServiceKeyMetric(getApplicationModel(), serviceKey);
if (extra != null) {
serviceKeyMetric.setExtraInfo(extra);
}
serviceWrapperNumStats.get(wrapper).computeIfAbsent(serviceKeyMetric, k -> new AtomicLong(0L)).getAndAdd(size);
// MetricsSupport.fillZero(serviceWrapperNumStats);
}

public void setServiceKey(MetricsKeyWrapper wrapper, String serviceKey, int num) {
setExtraServiceKey(wrapper, serviceKey, num, null);
}

public void setExtraServiceKey(MetricsKeyWrapper wrapper, String serviceKey, int num, Map<String, String> extra) {
if (!serviceWrapperNumStats.containsKey(wrapper)) {
return;
}
serviceWrapperNumStats.get(wrapper).computeIfAbsent(new ServiceKeyMetric(getApplicationModel(), serviceKey), k -> new AtomicLong(0L)).set(num);
// MetricsSupport.fillZero(serviceWrapperNumStats);
ServiceKeyMetric serviceKeyMetric = new ServiceKeyMetric(getApplicationModel(), serviceKey);
if (extra != null) {
serviceKeyMetric.setExtraInfo(extra);
}
serviceWrapperNumStats.get(wrapper).computeIfAbsent(serviceKeyMetric, k -> new AtomicLong(0L)).set(num);
}

@Override
public List<MetricSample> export(MetricsCategory category) {
List<MetricSample> list = new ArrayList<>();
for (MetricsKeyWrapper wrapper : serviceWrapperNumStats.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.dubbo.metrics.model.key.TypeWrapper;
import org.apache.dubbo.rpc.model.ApplicationModel;

import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Map;

Expand All @@ -40,7 +41,7 @@ public abstract class MetricsEvent {
private final String appName;
private final MetricsDispatcher metricsDispatcher;

private final Map<String, Object> attachment = new IdentityHashMap<>(8);
private final Map<String, Object> attachments = new IdentityHashMap<>(8);

public MetricsEvent(ApplicationModel source, TypeWrapper typeWrapper) {
this(source, null, null, typeWrapper);
Expand Down Expand Up @@ -82,11 +83,19 @@ public <T> T getAttachmentValue(String key) {
if (key == null) {
throw new MetricsNeverHappenException("Attachment key is null");
}
return (T) attachment.get(key);
return (T) attachments.get(key);
}

public Map<String, Object> getAttachments() {
return Collections.unmodifiableMap(attachments);
}

public void putAttachment(String key, Object value) {
attachment.put(key, value);
attachments.put(key, value);
}

public void putAttachments(Map<String, String> attachments) {
this.attachments.putAll(attachments);
}

public void setAvailable(boolean available) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
/**
* According to the event template of {@link MetricsEventBus},
* build a consistent static method for general and custom monitoring consume methods
*
*/
public abstract class AbstractMetricsKeyListener extends AbstractMetricsListener<TimeCounterEvent> implements MetricsLifeListener<TimeCounterEvent> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public abstract class AbstractMetricsListener<E extends MetricsEvent> implements
private final Map<Integer, Boolean> eventMatchCache = new ConcurrentHashMap<>();

/**
* Whether to support the general determination of event points depends on the event type
* Only interested in events of the current listener's generic parameter type
*/
public boolean isSupport(MetricsEvent event) {
Boolean eventMatch = eventMatchCache.get(System.identityHashCode(event.getClass()));
Expand Down
Loading

0 comments on commit 0add70e

Please sign in to comment.