Skip to content

Commit

Permalink
:fix: cleanup event bus subscription
Browse files Browse the repository at this point in the history
Signed-off-by: riccardomodanese <riccardo.modanese@eurotech.com>
  • Loading branch information
riccardomodanese committed Oct 21, 2024
1 parent 8916b3e commit be8bb3f
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.kapua.commons.service.event.store.internal.EventStoreRecordImplJpaRepository;
import org.eclipse.kapua.commons.service.event.store.internal.EventStoreServiceImpl;
import org.eclipse.kapua.event.ServiceEventBus;
import org.eclipse.kapua.event.Subscription;
import org.eclipse.kapua.locator.KapuaLocator;
import org.eclipse.kapua.service.authorization.AuthorizationService;
import org.eclipse.kapua.service.authorization.permission.PermissionFactory;
Expand Down Expand Up @@ -82,7 +83,7 @@ public void start() throws KapuaException {
}
// Listen to upstream service events
if (selc.getEventListener() != null) {
eventbus.subscribe(address, getSubscriptionName(address, selc.getClientName()), selc.getEventListener());
eventbus.subscribe(new Subscription(address, getSubscriptionName(address, selc.getClientName()), selc.getEventListener()));
}
servicesEntryList.add(new ServiceEntry(selc.getClientName(), address));
subscriberNames.add(selc.getClientName()); // Set because names must be unique
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.kapua.KapuaException;
import org.eclipse.kapua.commons.core.ServiceModule;
import org.eclipse.kapua.event.ServiceEventBus;
import org.eclipse.kapua.event.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -80,7 +81,7 @@ public void start() throws KapuaException {
}
// Listen to upstream service events
if (selc.getEventListener() != null) {
serviceEventBus.subscribe(address, getSubscriptionName(address, selc.getClientName()), selc.getEventListener());
serviceEventBus.subscribe(new Subscription(address, getSubscriptionName(address, selc.getClientName()), selc.getEventListener()));
}
servicesEntryList.add(new ServiceEntry(selc.getClientName(), address));
subscriberNames.add(selc.getClientName()); // Set because names must be unique
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.eclipse.kapua.event.ServiceEvent;
import org.eclipse.kapua.event.ServiceEventBus;
import org.eclipse.kapua.event.ServiceEventBusException;
import org.eclipse.kapua.event.ServiceEventBusListener;
import org.eclipse.kapua.event.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -126,10 +126,8 @@ public void publish(String address, ServiceEvent kapuaEvent)
}

@Override
public synchronized void subscribe(String address, String name, final ServiceEventBusListener kapuaEventListener)
throws ServiceEventBusException {
public synchronized void subscribe(Subscription subscription) throws ServiceEventBusException {
try {
Subscription subscription = new Subscription(address, name, kapuaEventListener);
subscriptionList.add(subscription);
eventBusJMSConnectionBridge.subscribe(subscription);
} catch (ServiceEventBusException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.kapua.event.ServiceEventBus;
import org.eclipse.kapua.event.ServiceEventBusException;
import org.eclipse.kapua.event.ServiceEventBusListener;
import org.eclipse.kapua.event.Subscription;
import org.eclipse.kapua.locator.KapuaLocator;
import org.eclipse.kapua.service.KapuaService;

Expand Down Expand Up @@ -103,6 +104,6 @@ public EntityManagerSession getEntityManagerSession() {
* @since 1.0.0kapua-sew
*/
protected void registerEventListener(@NotNull ServiceEventBusListener listener, @NotNull String address, @NotNull Class<? extends KapuaService> clazz) throws ServiceEventBusException {
serviceEventBus.subscribe(address, clazz.getName(), listener);
serviceEventBus.subscribe(new Subscription(address, clazz.getName(), listener));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.eclipse.kapua.event.ServiceEvent;
import org.eclipse.kapua.event.ServiceEventBus;
import org.eclipse.kapua.event.ServiceEventBusException;
import org.eclipse.kapua.event.ServiceEventBusListener;
import org.eclipse.kapua.event.Subscription;

import javax.inject.Singleton;

Expand Down Expand Up @@ -59,7 +59,7 @@ public void publish(String address, ServiceEvent event) throws ServiceEventBusEx
}

@Override
public void subscribe(String address, String name, ServiceEventBusListener eventListener) throws ServiceEventBusException {
public void subscribe(Subscription subscription) throws ServiceEventBusException {
//Nothing to do!
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.eclipse.kapua.event.ServiceEvent;
import org.eclipse.kapua.event.ServiceEventBus;
import org.eclipse.kapua.event.ServiceEventBusException;
import org.eclipse.kapua.event.ServiceEventBusListener;
import org.eclipse.kapua.event.Subscription;
import org.eclipse.kapua.locator.guice.OverridingModule;

import javax.inject.Named;
Expand Down Expand Up @@ -74,7 +74,7 @@ public void publish(String address, ServiceEvent event) throws ServiceEventBusEx
}

@Override
public void subscribe(String address, String name, ServiceEventBusListener eventListener) throws ServiceEventBusException {
public void subscribe(Subscription subscription) throws ServiceEventBusException {
//Nothing to do!
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ public interface ServiceEventBus {
/**
* Subscribe for a specific address event
*
* @param address address to listen for events
* @param name subscriber name. It's used to share events between multiple instances of the same consumer.
* @param serviceEventBusListener listener to invoke when an event is received
* @param subscription
* @throws ServiceEventBusException
*/
void subscribe(String address, String name, ServiceEventBusListener serviceEventBusListener) throws ServiceEventBusException;
void subscribe(Subscription subscription) throws ServiceEventBusException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@
* Contributors:
* Eurotech - initial API and implementation
*******************************************************************************/
package org.eclipse.kapua.commons.event.jms;

import org.eclipse.kapua.event.ServiceEventBusListener;
package org.eclipse.kapua.event;

public class Subscription {

private String name;
private String address;
private ServiceEventBusListener kapuaEventListener;

/**
*
* @param address address to listen for events
* @param name subscriber name. It's used to share events between multiple instances of the same consumer.
* @param serviceEventBusListener listener to invoke when an event is received
*/
public Subscription(String address, String name, ServiceEventBusListener kapuaEventListener) {
this.name = name;
this.address = address;
Expand All @@ -38,4 +42,7 @@ public ServiceEventBusListener getKapuaEventListener() {
return kapuaEventListener;
}

public void updateAddress(String pattern) {
address = String.format(pattern, address);
}
}

0 comments on commit be8bb3f

Please sign in to comment.