diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java index ec13586..c990d97 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java @@ -2,6 +2,7 @@ import eu.nebulouscloud.exn.Connector; import eu.nebulouscloud.exn.core.Publisher; +import eu.nebulouscloud.exn.handlers.ConnectorHandler; import eu.nebulouscloud.exn.settings.StaticExnConfig; import lombok.extern.slf4j.Slf4j; import org.json.simple.JSONObject; @@ -19,6 +20,7 @@ public class BrokerPublisher { private ArrayList publishers = new ArrayList<>(); private Connector active_connector; + private CustomConnectorHandler active_connector_handler; private String topic; private String broker_ip; private int broker_port; @@ -53,7 +55,17 @@ public BrokerPublisher(String topic, String broker_ip, int broker_port, String b log.info("Publisher configuration changed, creating new connector at "+broker_ip+" for topic "+topic); if (active_connector!=null) { //active_connector.stop(new ArrayList<>(), publishers); - active_connector.stop(); + synchronized (active_connector_handler.getReady()){ + while (!active_connector_handler.getReady().get()) { + try { + active_connector_handler.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + active_connector.stop(); + } + } publishers.clear(); //for (String broker_topic : broker_and_topics_to_publish_to.get(broker_ip)){ @@ -69,9 +81,9 @@ public BrokerPublisher(String topic, String broker_ip, int broker_port, String b //} //} //CustomConnectorHandler custom_handler = new CustomConnectorHandler(); - + active_connector_handler = new CustomConnectorHandler() {}; active_connector = new Connector("resource_manager" - , new CustomConnectorHandler() {} + , active_connector_handler , publishers , List.of(), false, diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/CustomConnectorHandler.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/CustomConnectorHandler.java index 6959a21..ff4b604 100644 --- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/CustomConnectorHandler.java +++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/CustomConnectorHandler.java @@ -3,12 +3,19 @@ import eu.nebulouscloud.exn.core.Context; import eu.nebulouscloud.exn.handlers.ConnectorHandler; +import java.util.concurrent.atomic.AtomicBoolean; + public class CustomConnectorHandler extends ConnectorHandler { private Context context; + private final AtomicBoolean ready = new AtomicBoolean(false); @Override public void onReady(Context context) { this.context = context; + synchronized (ready) { + this.ready.set(true); + this.ready.notify(); + } } public void remove_consumer_with_key(String key){ context.unregisterConsumer(key); @@ -21,4 +28,8 @@ public Context getContext() { public void setContext(Context context) { this.context = context; } + + public AtomicBoolean getReady() { + return ready; + } }