Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[deconz] Fix missing re-connect on dying websocket connections #34

Merged
merged 3 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void bridgeStatusChanged(ThingStatusInfo bridgeStatusInfo) {
return;
}

final WebSocketConnection webSocketConnection = bridgeHandler.getWebsocketConnection();
final WebSocketConnection webSocketConnection = bridgeHandler.getWebSocketConnection();
this.connection = webSocketConnection;

updateStatus(ThingStatus.UNKNOWN, ThingStatusDetail.NONE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,21 @@
*/
@NonNullByDefault
public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketConnectionListener {
public static final Set<ThingTypeUID> SUPPORTED_THING_TYPES = Collections.singleton(BRIDGE_TYPE);
public static final Set<ThingTypeUID> SUPPORTED_THING_TYPES = Set.of(BRIDGE_TYPE);

private static final int WEBSOCKET_WATCHDOG_INTERVAL = 120; // in s

private final Logger logger = LoggerFactory.getLogger(DeconzBridgeHandler.class);
private final WebSocketConnection websocket;
private final AsyncHttpClient http;
private final WebSocketFactory webSocketFactory;
private DeconzBridgeConfig config = new DeconzBridgeConfig();
private final Gson gson;
private @Nullable ScheduledFuture<?> scheduledFuture;
private @Nullable ScheduledFuture<?> connectionJob;
private int websocketPort = 0;
/** Prevent a dispose/init cycle while this flag is set. Use for property updates */
private boolean ignoreConfigurationUpdate;
private boolean thingDisposing = false;
private WebSocketConnection webSocketConnection;

private final ExpiringCacheAsync<Optional<BridgeFullState>> fullStateCache = new ExpiringCacheAsync<>(1000);

Expand All @@ -84,13 +87,19 @@ public DeconzBridgeHandler(Bridge thing, WebSocketFactory webSocketFactory, Asyn
super(thing);
this.http = http;
this.gson = gson;
this.webSocketFactory = webSocketFactory;
this.webSocketConnection = createNewWebSocketConnection();
}

private WebSocketConnection createNewWebSocketConnection() {
String websocketID = thing.getUID().getAsString().replace(':', '-');
if (websocketID.length() < 4) {
websocketID = "openHAB-deconz-" + websocketID;
} else if (websocketID.length() > 20) {
websocketID = websocketID.substring(websocketID.length() - 20);
}
this.websocket = new WebSocketConnection(this, webSocketFactory.createWebSocketClient(websocketID), gson);
return new WebSocketConnection(this, webSocketFactory.createWebSocketClient(websocketID), gson,
WEBSOCKET_WATCHDOG_INTERVAL);
}

@Override
Expand All @@ -113,10 +122,10 @@ public void handleCommand(ChannelUID channelUID, Command command) {
* Stops the API request or websocket reconnect timer
*/
private void stopTimer() {
ScheduledFuture<?> future = scheduledFuture;
ScheduledFuture<?> future = connectionJob;
if (future != null) {
future.cancel(true);
scheduledFuture = null;
connectionJob = null;
}
}

Expand All @@ -134,7 +143,7 @@ private void parseAPIKeyResponse(AsyncHttpClient.Result r) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_PENDING,
"Allow authentication for 3rd party apps. Trying again in " + POLL_FREQUENCY_SEC + " seconds");
stopTimer();
scheduledFuture = scheduler.schedule(this::requestApiKey, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
connectionJob = scheduler.schedule(this::requestApiKey, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
} else if (r.getResponseCode() == 200) {
ApiKeyMessage[] response = Objects.requireNonNull(gson.fromJson(r.getBody(), ApiKeyMessage[].class));
if (response.length == 0) {
Expand Down Expand Up @@ -227,11 +236,11 @@ public void initializeBridgeState() {

// Use requested websocket port if no specific port is given
websocketPort = config.port == 0 ? state.config.websocketport : config.port;
startWebsocket();
startWebSocketConnection();
}, () -> {
// initial response was empty, re-trying in POLL_FREQUENCY_SEC seconds
if (!thingDisposing) {
scheduledFuture = scheduler.schedule(this::initializeBridgeState, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
connectionJob = scheduler.schedule(this::initializeBridgeState, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
}
})).exceptionally(e -> {
if (e != null) {
Expand All @@ -241,7 +250,7 @@ public void initializeBridgeState() {
}
logger.warn("Initial full state request or result parsing failed", e);
if (!thingDisposing) {
scheduledFuture = scheduler.schedule(this::initializeBridgeState, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
connectionJob = scheduler.schedule(this::initializeBridgeState, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
}
return null;
});
Expand All @@ -251,15 +260,15 @@ public void initializeBridgeState() {
* Starts the websocket connection.
* {@link #initializeBridgeState} need to be called first to obtain the websocket port.
*/
private void startWebsocket() {
if (websocket.isConnected() || websocketPort == 0 || thingDisposing) {
private void startWebSocketConnection() {
if (webSocketConnection.isConnected() || websocketPort == 0 || thingDisposing) {
return;
}

stopTimer();
scheduledFuture = scheduler.schedule(this::startWebsocket, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
connectionJob = scheduler.schedule(this::startWebSocketConnection, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);

websocket.start(config.getHostWithoutPort() + ":" + websocketPort);
webSocketConnection.start(config.getHostWithoutPort() + ":" + websocketPort);
}

/**
Expand Down Expand Up @@ -294,29 +303,33 @@ public void initialize() {
public void dispose() {
thingDisposing = true;
stopTimer();
websocket.close();
webSocketConnection.dispose();
}

@Override
public void connectionEstablished() {
public void webSocketConnectionEstablished() {
stopTimer();
updateStatus(ThingStatus.ONLINE);
}

@Override
public void connectionLost(String reason) {
public void webSocketConnectionLost(String reason) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, reason);

stopTimer();

// make sure we get a new connection
webSocketConnection.dispose();
webSocketConnection = createNewWebSocketConnection();

// Wait for POLL_FREQUENCY_SEC after a connection was closed before trying again
scheduledFuture = scheduler.schedule(this::startWebsocket, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
connectionJob = scheduler.schedule(this::startWebSocketConnection, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
}

/**
* Return the websocket connection.
*/
public WebSocketConnection getWebsocketConnection() {
return websocket;
public WebSocketConnection getWebSocketConnection() {
return webSocketConnection;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jdt.annotation.NonNullByDefault;
Expand All @@ -28,6 +31,7 @@
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.openhab.core.common.ThreadPoolManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smarthomej.binding.deconz.internal.dto.DeconzBaseMessage;
Expand All @@ -47,23 +51,29 @@
public class WebSocketConnection {
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
private final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
private final ScheduledExecutorService scheduler = ThreadPoolManager.getScheduledPool("thingHandler");

private final WebSocketClient client;
private final String socketName;
private final Gson gson;
private final int watchdogInterval;

private final WebSocketConnectionListener connectionListener;
private final Map<String, WebSocketMessageListener> listeners = new ConcurrentHashMap<>();

private ConnectionState connectionState = ConnectionState.DISCONNECTED;
private @Nullable ScheduledFuture<?> watchdogJob;

private @Nullable Session session;

public WebSocketConnection(WebSocketConnectionListener listener, WebSocketClient client, Gson gson) {
public WebSocketConnection(WebSocketConnectionListener listener, WebSocketClient client, Gson gson,
int watchdogInterval) {
this.connectionListener = listener;
this.client = client;
this.client.setMaxIdleTimeout(0);
this.gson = gson;
this.socketName = "Websocket$" + System.currentTimeMillis() + "-" + INSTANCE_COUNTER.incrementAndGet();
this.watchdogInterval = watchdogInterval;
}

public void start(String ip) {
Expand All @@ -81,11 +91,36 @@ public void start(String ip) {
logger.debug("Trying to connect {} to {}", socketName, destUri);
client.connect(this, destUri).get();
} catch (Exception e) {
connectionListener.connectionLost("Error while connecting: " + e.getMessage());
String reason = "Error while connecting: " + e.getMessage();
logger.warn("{}: {}", socketName, reason);
connectionListener.webSocketConnectionLost(reason);
}
}

private void startOrResetWatchdogTimer() {
// TODO: remove log
logger.trace("Websocket WatchdogTimer reset.");
stopWatchdogTimer(); // stop already running timer
watchdogJob = scheduler.schedule(
() -> connectionListener.webSocketConnectionLost(
"Watchdog timed out after " + watchdogInterval + "s. Websocket seems to be dead."),
watchdogInterval, TimeUnit.SECONDS);
}

private void stopWatchdogTimer() {
ScheduledFuture<?> watchdogTimer = this.watchdogJob;
if (watchdogTimer != null) {
watchdogTimer.cancel(true);
this.watchdogJob = null;
}
}

public void close() {
/**
* dispose the websocket (close connection and destroy client)
*
*/
public void dispose() {
stopWatchdogTimer();
try {
connectionState = ConnectionState.DISCONNECTING;
client.stop();
Expand All @@ -109,7 +144,8 @@ public void onConnect(Session session) {
connectionState = ConnectionState.CONNECTED;
logger.debug("{} successfully connected to {}: {}", socketName, session.getRemoteAddress().getAddress(),
session.hashCode());
connectionListener.connectionEstablished();
connectionListener.webSocketConnectionEstablished();
startOrResetWatchdogTimer();
this.session = session;
}

Expand All @@ -120,6 +156,7 @@ public void onMessage(Session session, String message) {
handleWrongSession(session, message);
return;
}
startOrResetWatchdogTimer();
logger.trace("{} received raw data: {}", socketName, message);

try {
Expand Down Expand Up @@ -155,11 +192,8 @@ public void onMessage(Session session, String message) {
return;
}

DeconzBaseMessage deconzMessage = gson.fromJson(message, expectedMessageType);
if (deconzMessage != null) {
listener.messageReceived(deconzMessage);

}
DeconzBaseMessage deconzMessage = Objects.requireNonNull(gson.fromJson(message, expectedMessageType));
listener.messageReceived(deconzMessage);
} catch (RuntimeException e) {
// we need to catch all processing exceptions, otherwise they could affect the connection
logger.warn("{} encountered an error while processing the message {}: {}", socketName, message,
Expand All @@ -176,6 +210,7 @@ public void onError(Session session, Throwable cause) {
}
logger.warn("{} connection errored, closing: {}", socketName, cause.getMessage());

stopWatchdogTimer();
Session storedSession = this.session;
if (storedSession != null && storedSession.isOpen()) {
storedSession.close(-1, "Processing error");
Expand All @@ -191,8 +226,9 @@ public void onClose(Session session, int statusCode, String reason) {
}
logger.trace("{} closed connection: {} / {}", socketName, statusCode, reason);
connectionState = ConnectionState.DISCONNECTED;
stopWatchdogTimer();
this.session = null;
connectionListener.connectionLost(reason);
connectionListener.webSocketConnectionLost(reason);
}

private void handleWrongSession(Session session, String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ public interface WebSocketConnectionListener {
/**
* Connection successfully established.
*/
void connectionEstablished();
void webSocketConnectionEstablished();

/**
* Connection lost. A reconnect timer has been started.
*
* @param reason A reason for the disconnection
*/
void connectionLost(String reason);
void webSocketConnectionLost(String reason);
}