Skip to content

Commit

Permalink
[openhabcloud] Reconnection Fixes (#14251)
Browse files Browse the repository at this point in the history
* [openhabcloud] Possible connection leak
* Creates thread safe reconnection, reduces unnecessary polling on setup, removes unused variables.
* adds the reconnect settings to the internal socket.io options.
* Up the min reconnect time
* Use @ssalonen sugestion for backoff mins and randomness
* Reconnect after server initiated disconnect
* Remove unhelpful comments

Signed-off-by: Dan Cunningham <dan@digitaldan.com>
  • Loading branch information
digitaldan authored Jan 29, 2023
1 parent 16f3a3e commit 9ba3c07
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.jetty.client.HttpClient;
Expand All @@ -51,6 +54,7 @@
import io.socket.client.Socket;
import io.socket.emitter.Emitter;
import io.socket.engineio.client.Transport;
import io.socket.engineio.client.transports.WebSocket;
import io.socket.parser.Packet;
import io.socket.parser.Parser;
import okhttp3.OkHttpClient.Builder;
Expand All @@ -68,6 +72,15 @@
* @author Kai Kreuzer - migrated code to new Jetty client and ESH APIs
*/
public class CloudClient {

private static final long RECONNECT_MIN = 2_000;

private static final long RECONNECT_MAX = 60_000;

private static final double RECONNECT_JITTER = 0.75;

private static final long READ_TIMEOUT = 60_0000;

/*
* Logger for this class
*/
Expand Down Expand Up @@ -108,11 +121,6 @@ public class CloudClient {
*/
private boolean isConnected;

/*
* This variable holds version of local openHAB
*/
private String openHABVersion;

/*
* This variable holds instance of Socket.IO client class which provides communication
* with the openHAB Cloud
Expand All @@ -139,11 +147,15 @@ public class CloudClient {

/*
* Delay reconnect scheduler pool
*
*
*/
protected final ScheduledExecutorService scheduler = ThreadPoolManager
.getScheduledPool(ThreadPoolManager.THREAD_POOL_NAME_COMMON);

@SuppressWarnings("null")
private final AtomicReference<Optional<ScheduledFuture<?>>> reconnectFuture = new AtomicReference<>(
Optional.empty());

/**
* Constructor of CloudClient
*
Expand All @@ -161,9 +173,9 @@ public CloudClient(HttpClient httpClient, String uuid, String secret, String bas
this.remoteAccessEnabled = remoteAccessEnabled;
this.exposedItems = exposedItems;
this.jettyClient = httpClient;
reconnectBackoff.setMin(1000);
reconnectBackoff.setMax(30_000);
reconnectBackoff.setJitter(0.5);
reconnectBackoff.setMin(RECONNECT_MIN);
reconnectBackoff.setMax(RECONNECT_MAX);
reconnectBackoff.setJitter(RECONNECT_JITTER);
}

/**
Expand All @@ -173,17 +185,25 @@ public CloudClient(HttpClient httpClient, String uuid, String secret, String bas
public void connect() {
try {
Options options = new Options();
options.transports = new String[] { WebSocket.NAME };
options.reconnection = true;
options.reconnectionAttempts = Integer.MAX_VALUE;
options.reconnectionDelay = RECONNECT_MIN;
options.reconnectionDelayMax = RECONNECT_MAX;
options.randomizationFactor = RECONNECT_JITTER;
options.timeout = READ_TIMEOUT;
Builder okHttpBuilder = new Builder();
okHttpBuilder.readTimeout(READ_TIMEOUT, TimeUnit.MILLISECONDS);
if (logger.isTraceEnabled()) {
// When trace level logging is enabled, we activate further logging of HTTP calls
// of the Socket.IO library
Builder okHttpBuilder = new Builder();
HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
loggingInterceptor.setLevel(Level.BASIC);
okHttpBuilder.addInterceptor(loggingInterceptor);
okHttpBuilder.addNetworkInterceptor(loggingInterceptor);
options.callFactory = okHttpBuilder.build();
options.webSocketFactory = okHttpBuilder.build();
}
options.callFactory = okHttpBuilder.build();
options.webSocketFactory = okHttpBuilder.build();
socket = IO.socket(baseURL, options);
URL parsed = new URL(baseURL);
protocol = parsed.getProtocol();
Expand Down Expand Up @@ -273,13 +293,17 @@ public void call(Object... args) {
.on(Socket.EVENT_RECONNECT_FAILED,
args -> logger.debug("Socket.IO re-connect attempts failed. Stopping reconnection."))//
.on(Socket.EVENT_DISCONNECT, args -> {
if (args.length > 0) {
logger.warn("Socket.IO disconnected: {}", args[0]);
} else {
logger.warn("Socket.IO disconnected");
}
String message = args.length > 0 ? args[0].toString() : "";
logger.warn("Socket.IO disconnected: {}", message);
isConnected = false;
onDisconnect();
// https://github.com/socketio/socket.io-client/commit/afb952d854e1d8728ce07b7c3a9f0dee2a61ef4e
if ("io server disconnect".equals(message)) {
socket.close();
long delay = reconnectBackoff.duration();
logger.warn("Reconnecting after {} ms.", delay);
scheduleReconnect(delay);
}
})//
.on(Socket.EVENT_ERROR, args -> {
if (CloudClient.this.socket.connected()) {
Expand Down Expand Up @@ -325,12 +349,7 @@ public void call(Object... args) {
logger.warn("Error connecting to the openHAB Cloud instance. Reconnecting.");
}
socket.close();
scheduler.schedule(new Runnable() {
@Override
public void run() {
socket.connect();
}
}, delay, TimeUnit.MILLISECONDS);
scheduleReconnect(delay);
}
})//

Expand Down Expand Up @@ -671,21 +690,23 @@ public boolean isConnected() {
*/
public void shutdown() {
logger.info("Shutting down openHAB Cloud service connection");
reconnectFuture.get().ifPresent(future -> future.cancel(true));
socket.disconnect();
}

public String getOpenHABVersion() {
return openHABVersion;
}

public void setOpenHABVersion(String openHABVersion) {
this.openHABVersion = openHABVersion;
}

public void setListener(CloudClientListener listener) {
this.listener = listener;
}

private void scheduleReconnect(long delay) {
reconnectFuture.getAndSet(Optional.of(scheduler.schedule(new Runnable() {
@Override
public void run() {
socket.connect();
}
}, delay, TimeUnit.MILLISECONDS))).ifPresent(future -> future.cancel(true));
}

private JSONObject getJSONHeaders(HttpFields httpFields) {
JSONObject headersJSON = new JSONObject();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ protected void modified(Map<String, ?> config) {
String localBaseUrl = "http://localhost:" + localPort;
cloudClient = new CloudClient(httpClient, InstanceUUID.get(), getSecret(), cloudBaseUrl, localBaseUrl,
remoteAccessEnabled, exposedItems);
cloudClient.setOpenHABVersion(OpenHAB.getVersion());
cloudClient.connect();
cloudClient.setListener(this);
NotificationAction.cloudService = this;
Expand Down

0 comments on commit 9ba3c07

Please sign in to comment.