diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java index eb415b9f..0f885a4e 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java @@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttMessageListener; @@ -41,6 +42,8 @@ import org.eclipse.paho.client.mqttv3.logging.Logger; import org.eclipse.paho.client.mqttv3.logging.LoggerFactory; +import static org.eclipse.paho.client.mqttv3.internal.CommsSender.MAX_STOPPED_STATE_TO_STOP_THREAD; + /** * Bridge between Receiver and the external API. This class gets called by * Receiver, and then converts the comms-centric MQTT message objects into ones @@ -113,6 +116,18 @@ public void start(String threadName, ExecutorService executorService) { } } } + + AtomicInteger stoppedStateCounter = new AtomicInteger(0); + while (!isRunning()) { + try { Thread.sleep(100); } catch (Exception e) { } + if (current_state == State.STOPPED) { + if (stoppedStateCounter.incrementAndGet() > MAX_STOPPED_STATE_TO_STOP_THREAD) { + break; + } + } else { + stoppedStateCounter.set(0); + } + } } /** diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsReceiver.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsReceiver.java index 0a608bfc..60eba5fc 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsReceiver.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsReceiver.java @@ -20,6 +20,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttToken; @@ -32,6 +33,8 @@ import org.eclipse.paho.client.mqttv3.logging.Logger; import org.eclipse.paho.client.mqttv3.logging.LoggerFactory; +import static org.eclipse.paho.client.mqttv3.internal.CommsSender.MAX_STOPPED_STATE_TO_STOP_THREAD; + /** * Receives MQTT packets from the server. */ @@ -85,6 +88,18 @@ public void start(String threadName, ExecutorService executorService) { } } } + + AtomicInteger stoppedStateCounter = new AtomicInteger(0); + while (!isRunning()) { + try { Thread.sleep(100); } catch (Exception e) { } + if (current_state == State.STOPPED) { + if (stoppedStateCounter.incrementAndGet() > MAX_STOPPED_STATE_TO_STOP_THREAD) { + break; + } + } else { + stoppedStateCounter.set(0); + } + } } /** diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsSender.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsSender.java index cd886620..51c8a2f6 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsSender.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsSender.java @@ -20,6 +20,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttToken; @@ -35,6 +36,8 @@ public class CommsSender implements Runnable { private static final String CLASS_NAME = CommsSender.class.getName(); private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME); + public static final int MAX_STOPPED_STATE_TO_STOP_THREAD = 300; // 30 seconds + //Sends MQTT packets to the server on its own thread private enum State {STOPPED, RUNNING, STARTING} @@ -80,6 +83,18 @@ public void start(String threadName, ExecutorService executorService) { } } } + + AtomicInteger stoppedStateCounter = new AtomicInteger(0); + while (!isRunning()) { + try { Thread.sleep(100); } catch (Exception e) { } + if (current_state == State.STOPPED) { + if (stoppedStateCounter.incrementAndGet() > MAX_STOPPED_STATE_TO_STOP_THREAD) { + break; + } + } else { + stoppedStateCounter.set(0); + } + } } /**