Skip to content

Commit

Permalink
Merge pull request #920 from PavelAnikeichyk/develop
Browse files Browse the repository at this point in the history
Fixed issue #850 - MQTT Con thread leak on connection failure
  • Loading branch information
rdasgupt authored Aug 6, 2022
2 parents 3daa4dd + a356442 commit 0941287
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
Expand All @@ -41,6 +42,8 @@
import org.eclipse.paho.client.mqttv3.logging.Logger;
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;

import static org.eclipse.paho.client.mqttv3.internal.CommsSender.MAX_STOPPED_STATE_TO_STOP_THREAD;

/**
* Bridge between Receiver and the external API. This class gets called by
* Receiver, and then converts the comms-centric MQTT message objects into ones
Expand Down Expand Up @@ -113,6 +116,18 @@ public void start(String threadName, ExecutorService executorService) {
}
}
}

AtomicInteger stoppedStateCounter = new AtomicInteger(0);
while (!isRunning()) {
try { Thread.sleep(100); } catch (Exception e) { }
if (current_state == State.STOPPED) {
if (stoppedStateCounter.incrementAndGet() > MAX_STOPPED_STATE_TO_STOP_THREAD) {
break;
}
} else {
stoppedStateCounter.set(0);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttToken;
Expand All @@ -32,6 +33,8 @@
import org.eclipse.paho.client.mqttv3.logging.Logger;
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;

import static org.eclipse.paho.client.mqttv3.internal.CommsSender.MAX_STOPPED_STATE_TO_STOP_THREAD;

/**
* Receives MQTT packets from the server.
*/
Expand Down Expand Up @@ -85,6 +88,18 @@ public void start(String threadName, ExecutorService executorService) {
}
}
}

AtomicInteger stoppedStateCounter = new AtomicInteger(0);
while (!isRunning()) {
try { Thread.sleep(100); } catch (Exception e) { }
if (current_state == State.STOPPED) {
if (stoppedStateCounter.incrementAndGet() > MAX_STOPPED_STATE_TO_STOP_THREAD) {
break;
}
} else {
stoppedStateCounter.set(0);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttToken;
Expand All @@ -35,6 +36,8 @@ public class CommsSender implements Runnable {
private static final String CLASS_NAME = CommsSender.class.getName();
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);

public static final int MAX_STOPPED_STATE_TO_STOP_THREAD = 300; // 30 seconds

//Sends MQTT packets to the server on its own thread
private enum State {STOPPED, RUNNING, STARTING}

Expand Down Expand Up @@ -80,6 +83,18 @@ public void start(String threadName, ExecutorService executorService) {
}
}
}

AtomicInteger stoppedStateCounter = new AtomicInteger(0);
while (!isRunning()) {
try { Thread.sleep(100); } catch (Exception e) { }
if (current_state == State.STOPPED) {
if (stoppedStateCounter.incrementAndGet() > MAX_STOPPED_STATE_TO_STOP_THREAD) {
break;
}
} else {
stoppedStateCounter.set(0);
}
}
}

/**
Expand Down

0 comments on commit 0941287

Please sign in to comment.