Skip to content

Commit

Permalink
[gardena] Fix handling of websocket connection losses that causes mem…
Browse files Browse the repository at this point in the history
…ory leaks (#11825)

* [gardena] Fix handling of websocket connection losses that causes memory leaks

* The binding no longer restarts websockets more than once if the connection is lost

Signed-off-by: Nico Brüttner <n@bruettner.de>
  • Loading branch information
Bruetti1991 authored Jun 14, 2022
1 parent 58342f7 commit fd9fa72
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@
*/
package org.openhab.binding.gardena.internal;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -37,6 +35,7 @@
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.openhab.binding.gardena.internal.config.GardenaConfig;
import org.openhab.binding.gardena.internal.exception.GardenaDeviceNotFoundException;
import org.openhab.binding.gardena.internal.exception.GardenaException;
Expand Down Expand Up @@ -87,10 +86,10 @@ public class GardenaSmartImpl implements GardenaSmart, GardenaSmartWebSocketList
private GardenaSmartEventListener eventListener;

private HttpClient httpClient;
private List<GardenaSmartWebSocket> webSockets = new ArrayList<>();
private Map<String, GardenaSmartWebSocket> webSockets = new HashMap<>();
private @Nullable PostOAuth2Response token;
private boolean initialized = false;
private WebSocketFactory webSocketFactory;
private WebSocketClient webSocketClient;

private Set<Device> devicesToNotify = ConcurrentHashMap.newKeySet();
private @Nullable ScheduledFuture<?> deviceToNotifyFuture;
Expand All @@ -103,7 +102,6 @@ public GardenaSmartImpl(String id, GardenaConfig config, GardenaSmartEventListen
this.config = config;
this.eventListener = eventListener;
this.scheduler = scheduler;
this.webSocketFactory = webSocketFactory;

logger.debug("Starting GardenaSmart");
try {
Expand All @@ -112,6 +110,13 @@ public GardenaSmartImpl(String id, GardenaConfig config, GardenaSmartEventListen
httpClient.setIdleTimeout(httpClient.getConnectTimeout());
httpClient.start();

String webSocketId = String.valueOf(hashCode());
webSocketClient = webSocketFactory.createWebSocketClient(webSocketId);
webSocketClient.setConnectTimeout(config.getConnectionTimeout() * 1000L);
webSocketClient.setStopTimeout(3000);
webSocketClient.setMaxIdleTimeout(150000);
webSocketClient.start();

// initially load access token
verifyToken();
locationsResponse = loadLocations();
Expand All @@ -132,6 +137,10 @@ public GardenaSmartImpl(String id, GardenaConfig config, GardenaSmartEventListen

startWebsockets();
initialized = true;
} catch (GardenaException ex) {
dispose();
// pass GardenaException to calling function
throw ex;
} catch (Exception ex) {
dispose();
throw new GardenaException(ex.getMessage(), ex);
Expand All @@ -145,16 +154,16 @@ private void startWebsockets() throws Exception {
for (LocationDataItem location : locationsResponse.data) {
WebSocketCreatedResponse webSocketCreatedResponse = getWebsocketInfo(location.id);
String socketId = id + "-" + location.attributes.name;
webSockets.add(new GardenaSmartWebSocket(this, webSocketCreatedResponse, config, scheduler,
webSocketFactory, token, socketId));
webSockets.put(location.id, new GardenaSmartWebSocket(this, webSocketClient, scheduler,
webSocketCreatedResponse.data.attributes.url, token, socketId, location.id));
}
}

/**
* Stops all websockets.
*/
private void stopWebsockets() {
for (GardenaSmartWebSocket webSocket : webSockets) {
for (GardenaSmartWebSocket webSocket : webSockets.values()) {
webSocket.stop();
}
webSockets.clear();
Expand Down Expand Up @@ -203,7 +212,7 @@ private <T> T executeRequest(HttpMethod method, String url, @Nullable Object con

if (status != 200 && status != 204 && status != 201 && status != 202) {
throw new GardenaException(String.format("Error %s %s, %s", status, contentResponse.getReason(),
contentResponse.getContentAsString()));
contentResponse.getContentAsString()), status);
}

if (result == null) {
Expand Down Expand Up @@ -297,10 +306,12 @@ public void dispose() {
stopWebsockets();
try {
httpClient.stop();
webSocketClient.stop();
} catch (Exception e) {
// ignore
}
httpClient.destroy();
webSocketClient.destroy();
locationsResponse = new LocationsResponse();
allDevicesById.clear();
initialized = false;
Expand All @@ -311,12 +322,17 @@ public void dispose() {
*/
@Override
public synchronized void restartWebsockets() {
logger.debug("Restarting GardenaSmart Webservice");
logger.debug("Restarting GardenaSmart Webservices");
stopWebsockets();
try {
startWebsockets();
} catch (Exception ex) {
logger.warn("Restarting GardenaSmart Webservice failed: {}, restarting binding", ex.getMessage());
// restart binding
if (logger.isDebugEnabled()) {
logger.warn("Restarting GardenaSmart Webservices failed! Restarting binding", ex);
} else {
logger.warn("Restarting GardenaSmart Webservices failed: {}! Restarting binding", ex.getMessage());
}
eventListener.onError();
}
}
Expand Down Expand Up @@ -350,13 +366,38 @@ private void handleDataItem(final DataItem<?> dataItem) throws GardenaException
}

@Override
public void onWebSocketClose() {
restartWebsockets();
public void onWebSocketClose(String id) {
restartWebsocket(webSockets.get(id));
}

@Override
public void onWebSocketError() {
eventListener.onError();
public void onWebSocketError(String id) {
restartWebsocket(webSockets.get(id));
}

private void restartWebsocket(@Nullable GardenaSmartWebSocket socket) {
synchronized (this) {
if (socket != null && !socket.isClosing()) {
// close socket, if still open
logger.info("Restarting GardenaSmart Webservice ({})", socket.getSocketID());
socket.stop();
} else {
// if socket is already closing, exit function and do not restart socket
return;
}
}

try {
Thread.sleep(3000);
WebSocketCreatedResponse webSocketCreatedResponse = getWebsocketInfo(socket.getLocationID());
// only restart single socket, do not restart binding
socket.restart(webSocketCreatedResponse.data.attributes.url);
} catch (Exception ex) {
// restart binding on error
logger.warn("Restarting GardenaSmart Webservice failed ({}): {}, restarting binding", socket.getSocketID(),
ex.getMessage());
eventListener.onError();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.frames.PongFrame;
import org.openhab.binding.gardena.internal.config.GardenaConfig;
import org.openhab.binding.gardena.internal.model.dto.api.PostOAuth2Response;
import org.openhab.binding.gardena.internal.model.dto.api.WebSocketCreatedResponse;
import org.openhab.core.io.net.http.WebSocketFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -63,29 +60,23 @@ public class GardenaSmartWebSocket {
private ByteBuffer pingPayload = ByteBuffer.wrap("ping".getBytes(StandardCharsets.UTF_8));
private @Nullable PostOAuth2Response token;
private String socketId;
private String locationID;

/**
* Starts the websocket session.
*/
public GardenaSmartWebSocket(GardenaSmartWebSocketListener socketEventListener,
WebSocketCreatedResponse webSocketCreatedResponse, GardenaConfig config, ScheduledExecutorService scheduler,
WebSocketFactory webSocketFactory, @Nullable PostOAuth2Response token, String socketId) throws Exception {
public GardenaSmartWebSocket(GardenaSmartWebSocketListener socketEventListener, WebSocketClient webSocketClient,
ScheduledExecutorService scheduler, String url, @Nullable PostOAuth2Response token, String socketId,
String locationID) throws Exception {
this.socketEventListener = socketEventListener;
this.webSocketClient = webSocketClient;
this.scheduler = scheduler;
this.token = token;
this.socketId = socketId;
this.locationID = locationID;

String webSocketId = String.valueOf(hashCode());
webSocketClient = webSocketFactory.createWebSocketClient(webSocketId);
webSocketClient.setConnectTimeout(config.getConnectionTimeout() * 1000L);
webSocketClient.setStopTimeout(3000);
webSocketClient.setMaxIdleTimeout(150000);
webSocketClient.start();

session = (WebSocketSession) webSocketClient.connect(this, new URI(url)).get();
logger.debug("Connecting to Gardena Webservice ({})", socketId);
session = (WebSocketSession) webSocketClient
.connect(this, new URI(webSocketCreatedResponse.data.attributes.url)).get();
session.setStopTimeout(3000);
}

/**
Expand All @@ -97,35 +88,44 @@ public synchronized void stop() {
if (connectionTracker != null) {
connectionTracker.cancel(true);
}
if (isRunning()) {
logger.debug("Closing Gardena Webservice client ({})", socketId);
try {
session.close();
} catch (Exception ex) {
// ignore
} finally {
try {
webSocketClient.stop();
} catch (Exception e) {
// ignore
}
}

logger.debug("Closing Gardena Webservice ({})", socketId);
try {
session.close();
} catch (Exception ex) {
// ignore
}
}

/**
* Returns true, if the websocket is running.
*/
public synchronized boolean isRunning() {
return session.isOpen();
public boolean isClosing() {
return this.closing;
}

public String getSocketID() {
return this.socketId;
}

public String getLocationID() {
return this.locationID;
}

public void restart(String newUrl) throws Exception {
logger.debug("Reconnecting to Gardena Webservice ({})", socketId);
session = (WebSocketSession) webSocketClient.connect(this, new URI(newUrl)).get();
}

@OnWebSocketConnect
public void onConnect(Session session) {
closing = false;
logger.debug("Connected to Gardena Webservice ({})", socketId);

connectionTracker = scheduler.scheduleWithFixedDelay(this::sendKeepAlivePing, 2, 2, TimeUnit.MINUTES);
ScheduledFuture<?> connectionTracker = this.connectionTracker;
if (connectionTracker != null && !connectionTracker.isCancelled()) {
connectionTracker.cancel(false);
}

// start sending PING every two minutes
this.connectionTracker = scheduler.scheduleWithFixedDelay(this::sendKeepAlivePing, 2, 2, TimeUnit.MINUTES);
}

@OnWebSocketFrame
Expand All @@ -138,19 +138,22 @@ public void onFrame(Frame pong) {

@OnWebSocketClose
public void onClose(int statusCode, String reason) {
logger.debug("Connection to Gardena Webservice was closed ({}): code: {}, reason: {}", socketId, statusCode,
reason);

if (!closing) {
logger.debug("Connection to Gardena Webservice was closed ({}): code: {}, reason: {}", socketId, statusCode,
reason);
socketEventListener.onWebSocketClose();
// let listener handle restart of socket
socketEventListener.onWebSocketClose(locationID);
}
}

@OnWebSocketError
public void onError(Throwable cause) {
logger.debug("Gardena Webservice error ({})", socketId, cause); // log whole stack trace

if (!closing) {
logger.warn("Gardena Webservice error ({}): {}, restarting", socketId, cause.getMessage());
logger.debug("{}", cause.getMessage(), cause);
socketEventListener.onWebSocketError();
// let listener handle restart of socket
socketEventListener.onWebSocketError(locationID);
}
}

Expand All @@ -166,16 +169,19 @@ public void onMessage(String msg) {
* Sends a ping to tell the Gardena smart system that the client is alive.
*/
private void sendKeepAlivePing() {
try {
logger.trace("Sending ping ({})", socketId);
session.getRemote().sendPing(pingPayload);
final PostOAuth2Response accessToken = token;
if ((Instant.now().getEpochSecond() - lastPong.getEpochSecond() > WEBSOCKET_IDLE_TIMEOUT)
|| accessToken == null || accessToken.isAccessTokenExpired()) {
session.close(1000, "Timeout manually closing dead connection (" + socketId + ")");
final PostOAuth2Response accessToken = token;
if ((Instant.now().getEpochSecond() - lastPong.getEpochSecond() > WEBSOCKET_IDLE_TIMEOUT) || accessToken == null
|| accessToken.isAccessTokenExpired()) {
session.close(1000, "Timeout manually closing dead connection (" + socketId + ")");
} else {
if (session.isOpen()) {
try {
logger.trace("Sending ping ({})", socketId);
session.getRemote().sendPing(pingPayload);
} catch (IOException ex) {
logger.debug("Error while sending ping: {}", ex.getMessage());
}
}
} catch (IOException ex) {
logger.debug("{}", ex.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ public interface GardenaSmartWebSocketListener {
/**
* This method is called, when the evenRunner stops abnormally (statuscode <> 1000).
*/
void onWebSocketClose();
void onWebSocketClose(String id);

/**
* This method is called when the Gardena websocket services throws an onError.
*/
void onWebSocketError();
void onWebSocketError(String id);

/**
* This method is called, whenever a new event comes from the Gardena service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,39 @@
public class GardenaException extends IOException {

private static final long serialVersionUID = 8568935118878542270L;
private int status; // http status

public GardenaException(String message) {
super(message);
this.status = -1;
}

public GardenaException(Throwable ex) {
super(ex);
this.status = -1;
}

public GardenaException(@Nullable String message, Throwable cause) {
super(message, cause);
this.status = -1;
}

public GardenaException(String message, int status) {
super(message);
this.status = status;
}

public GardenaException(Throwable ex, int status) {
super(ex);
this.status = status;
}

public GardenaException(@Nullable String message, Throwable cause, int status) {
super(message, cause);
this.status = status;
}

public int getStatus() {
return this.status;
}
}
Loading

0 comments on commit fd9fa72

Please sign in to comment.