Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #10079, address notification issue with service discovery multi subscription #10080

Merged
merged 5 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
Original file line number Diff line number Diff line change
Expand Up @@ -1402,8 +1402,8 @@ public String getProtocolServiceKey() {
}
this.protocolServiceKey = getServiceKey();
/*
Special treatment if this is a consumer subscription url instance with no protocol specified - starts with 'consumer://'
If the specific protocol is specified on the consumer side, then this method will return as normal.
Special treatment for urls begins with 'consumer://', that is, a consumer subscription url instance with no protocol specified.
If protocol is specified on the consumer side, then this method will return as normal.
*/
if (!CONSUMER.equals(getProtocol())) {
this.protocolServiceKey += (GROUP_CHAR_SEPARATOR + getProtocol());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,33 +204,30 @@ public synchronized void addListenerAndNotify(String serviceKey, NotifyListener
return;
}

Set<String> protocolServiceKeys = getProtocolServiceKeyList(serviceKey, listener);
for (String protocolServiceKey : protocolServiceKeys) {
// Add to global listeners
if (!this.listeners.containsKey(serviceKey)) {
// synchronized method, no need to use DCL
this.listeners.put(serviceKey, new ConcurrentHashSet<>());
}
Set<NotifyListenerWithKey> notifyListeners = this.listeners.get(serviceKey);
notifyListeners.add(new NotifyListenerWithKey(protocolServiceKey, listener));
}

Set<NotifyListenerWithKey> notifyListeners = this.listeners.computeIfAbsent(serviceKey, _k -> new ConcurrentHashSet<>());
// {@code protocolServiceKeysToConsume} will be specific protocols configured in reference config or default protocols supported by framework.
Set<String> protocolServiceKeysToConsume = getProtocolServiceKeyList(serviceKey, listener);
// Add current listener to serviceKey set, there will have more than one listener when multiple references of one same service is configured.
NotifyListenerWithKey listenerWithKey = new NotifyListenerWithKey(serviceKey, protocolServiceKeysToConsume, listener);
notifyListeners.add(listenerWithKey);

// Aggregate address and notify on subscription.
List<URL> urls;
if (protocolServiceKeys.size() > 1) {
if (protocolServiceKeysToConsume.size() > 1) {
urls = new ArrayList<>();
for (NotifyListenerWithKey notifyListenerWithKey : this.listeners.get(serviceKey)) {
String protocolKey = notifyListenerWithKey.getProtocolServiceKey();
List<URL> urlsOfProtocol = getAddresses(protocolKey, listener.getConsumerUrl());
for (String protocolServiceKey : protocolServiceKeysToConsume) {
List<URL> urlsOfProtocol = getAddresses(protocolServiceKey, listener.getConsumerUrl());
if (CollectionUtils.isNotEmpty(urlsOfProtocol)) {
logger.info(String.format("Found %s urls of protocol service key %s ", urlsOfProtocol.size(), protocolServiceKey));
urls.addAll(urlsOfProtocol);
}
}
} else {
String protocolKey = this.listeners.get(serviceKey).iterator().next().getProtocolServiceKey();
urls = getAddresses(protocolKey, listener.getConsumerUrl());
urls = getAddresses(protocolServiceKeysToConsume.iterator().next(), listener.getConsumerUrl());
}

if (CollectionUtils.isNotEmpty(urls)) {
logger.info(String.format("Notify serviceKey: %s, listener: %s with %s urls on subscription", serviceKey, listener, urls.size()));
listener.notify(urls);
}
}
Expand All @@ -240,18 +237,16 @@ public synchronized void removeListener(String serviceKey, NotifyListener notify
return;
}

for (String protocolServiceKey : getProtocolServiceKeyList(serviceKey, notifyListener)) {
// synchronized method, no need to use DCL
Set<NotifyListenerWithKey> notifyListeners = this.listeners.get(serviceKey);
if (notifyListeners != null) {
NotifyListenerWithKey listenerWithKey = new NotifyListenerWithKey(protocolServiceKey, notifyListener);
// Remove from global listeners
notifyListeners.remove(listenerWithKey);
// synchronized method, no need to use DCL
Set<NotifyListenerWithKey> notifyListeners = this.listeners.get(serviceKey);
if (notifyListeners != null) {
NotifyListenerWithKey listenerWithKey = new NotifyListenerWithKey(serviceKey, notifyListener);
// Remove from global listeners
notifyListeners.remove(listenerWithKey);

// ServiceKey has no listener, remove set
if (notifyListeners.size() == 0) {
this.listeners.remove(serviceKey);
}
// ServiceKey has no listener, remove set
if (notifyListeners.size() == 0) {
this.listeners.remove(serviceKey);
}
}
}
Expand Down Expand Up @@ -385,32 +380,32 @@ protected List<URL> getAddresses(String serviceProtocolKey, URL consumerURL) {
* race condition is protected by onEvent/doOnEvent
*/
protected void notifyAddressChanged() {
// 1 different services
listeners.forEach((serviceKey, listenerSet) -> {
if (listenerSet != null) {
if (listenerSet.size() == 1) {
NotifyListenerWithKey listenerWithKey = listenerSet.iterator().next();
String protocolServiceKey = listenerWithKey.getProtocolServiceKey();
NotifyListener notifyListener = listenerWithKey.getNotifyListener();
// 2 multiple subscription listener of the same service
for (NotifyListenerWithKey listenerWithKey : listenerSet) {
NotifyListener notifyListener = listenerWithKey.getNotifyListener();
if (listenerWithKey.getProtocolServiceKeys().size() == 1) {// 2.1 if one specific protocol is specified
String protocolServiceKey = listenerWithKey.getProtocolServiceKeys().iterator().next();
//FIXME, group wildcard match
List<URL> urls = toUrlsWithEmpty(getAddresses(protocolServiceKey, notifyListener.getConsumerUrl()));
logger.info("Notify service " + serviceKey + " with urls " + urls.size());
logger.info("Notify service " + protocolServiceKey + " with urls " + urls.size());
notifyListener.notify(urls);
} else {
} else {// 2.2 multiple protocols or no protocol(using default protocols) set
List<URL> urls = new ArrayList<>();
NotifyListener notifyListener = null;
for (NotifyListenerWithKey listenerWithKey : listenerSet) {
String protocolServiceKey = listenerWithKey.getProtocolServiceKey();
notifyListener = listenerWithKey.getNotifyListener();
int effectiveProtocolNum = 0;
for (String protocolServiceKey : listenerWithKey.getProtocolServiceKeys()) {
List<URL> tmpUrls = getAddresses(protocolServiceKey, notifyListener.getConsumerUrl());
if (CollectionUtils.isNotEmpty(tmpUrls)) {
logger.info("Found " + urls.size() + " urls of protocol service key " + protocolServiceKey);
effectiveProtocolNum++;
urls.addAll(tmpUrls);
}
}
if (notifyListener != null) {
logger.info("Notify service " + serviceKey + " with urls " + urls.size());
urls = toUrlsWithEmpty(urls);
notifyListener.notify(urls);
}

logger.info("Notify service " + serviceKey + " with " + urls.size() + " urls from " + effectiveProtocolNum + " different protocols");
urls = toUrlsWithEmpty(urls);
notifyListener.notify(urls);
}
}
});
Expand Down Expand Up @@ -477,7 +472,7 @@ public int hashCode() {
* Calculate the protocol list that the consumer cares about.
*
* @param serviceKey possible input serviceKey includes
* 1. {group}/{interface}:{version}:consumer
* 1. {group}/{interface}:{version}, if protocol is not specified
* 2. {group}/{interface}:{version}:{user specified protocols}
* @param listener listener also contains the user specified protocols
* @return protocol list with the format {group}/{interface}:{version}:{protocol}
Expand Down Expand Up @@ -530,16 +525,26 @@ public void run() {
}

public static class NotifyListenerWithKey {
private final String protocolServiceKey;
private final String serviceKey;
private final Set<String> protocolServiceKeys;
private final NotifyListener notifyListener;

public NotifyListenerWithKey(String protocolServiceKey, NotifyListener notifyListener) {
this.protocolServiceKey = protocolServiceKey;
public NotifyListenerWithKey(String protocolServiceKey, Set<String> protocolServiceKeys, NotifyListener notifyListener) {
this.serviceKey = protocolServiceKey;
this.protocolServiceKeys = (protocolServiceKeys == null ? new ConcurrentHashSet<>() : protocolServiceKeys);
this.notifyListener = notifyListener;
}

public String getProtocolServiceKey() {
return protocolServiceKey;
public NotifyListenerWithKey(String protocolServiceKey, NotifyListener notifyListener) {
this(protocolServiceKey, null, notifyListener);
}

public String getServiceKey() {
return serviceKey;
}

public Set<String> getProtocolServiceKeys() {
return protocolServiceKeys;
}

public NotifyListener getNotifyListener() {
Expand All @@ -555,12 +560,12 @@ public boolean equals(Object o) {
return false;
}
NotifyListenerWithKey that = (NotifyListenerWithKey) o;
return Objects.equals(protocolServiceKey, that.protocolServiceKey) && Objects.equals(notifyListener, that.notifyListener);
return Objects.equals(serviceKey, that.serviceKey) && Objects.equals(notifyListener, that.notifyListener);
}

@Override
public int hashCode() {
return Objects.hash(protocolServiceKey, notifyListener);
return Objects.hash(serviceKey, notifyListener);
}
}
}
Loading