From 3e7ecbf79cb576329d0d579f06ff263e3eacf12f Mon Sep 17 00:00:00 2001 From: lolodomo Date: Sat, 6 Feb 2021 10:52:47 +0100 Subject: [PATCH] [remoteopenhab] Detect a remote server shutdown and reconnect properly (#10060) when the remote server is alive again Related to #9680 Signed-off-by: Laurent Garnier --- .../handler/RemoteopenhabBridgeHandler.java | 16 +++++++++++--- .../RemoteopenhabStreamingDataListener.java | 7 ++++++- .../rest/RemoteopenhabRestClient.java | 21 ++++++++++++------- 3 files changed, 33 insertions(+), 11 deletions(-) diff --git a/bundles/org.openhab.binding.remoteopenhab/src/main/java/org/openhab/binding/remoteopenhab/internal/handler/RemoteopenhabBridgeHandler.java b/bundles/org.openhab.binding.remoteopenhab/src/main/java/org/openhab/binding/remoteopenhab/internal/handler/RemoteopenhabBridgeHandler.java index 6554658814e64..ad196ddc52a27 100644 --- a/bundles/org.openhab.binding.remoteopenhab/src/main/java/org/openhab/binding/remoteopenhab/internal/handler/RemoteopenhabBridgeHandler.java +++ b/bundles/org.openhab.binding.remoteopenhab/src/main/java/org/openhab/binding/remoteopenhab/internal/handler/RemoteopenhabBridgeHandler.java @@ -183,7 +183,7 @@ public void initialize() { @Override public void dispose() { logger.debug("Disposing remote openHAB handler for bridge {}", getThing().getUID()); - stopStreamingUpdates(); + stopStreamingUpdates(false); stopCheckConnectionJob(); channelsLastStates.clear(); } @@ -380,7 +380,8 @@ private void startCheckConnectionJob(int accessibilityInterval, int aliveInterva if (localCheckConnectionJob == null || localCheckConnectionJob.isCancelled()) { checkConnectionJob = scheduler.scheduleWithFixedDelay(() -> { long millisSinceLastEvent = System.currentTimeMillis() - restClient.getLastEventTimestamp(); - if (aliveInterval == 0 || restClient.getLastEventTimestamp() == 0) { + if (getThing().getStatus() != ThingStatus.ONLINE || aliveInterval == 0 + || restClient.getLastEventTimestamp() == 0) { logger.debug("Time to check server accessibility"); checkConnection(); } else if (millisSinceLastEvent > (aliveInterval * 60000)) { @@ -421,8 +422,12 @@ private void startStreamingUpdates() { } private void stopStreamingUpdates() { + stopStreamingUpdates(true); + } + + private void stopStreamingUpdates(boolean waitingForCompletion) { synchronized (restClient) { - restClient.stop(); + restClient.stop(waitingForCompletion); restClient.removeStreamingDataListener(this); restClient.removeItemsDataListener(this); } @@ -437,6 +442,11 @@ public void onConnected() { updateStatus(ThingStatus.ONLINE); } + @Override + public void onDisconnected() { + updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, "Disconected from the remote server"); + } + @Override public void onError(String message) { updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, message); diff --git a/bundles/org.openhab.binding.remoteopenhab/src/main/java/org/openhab/binding/remoteopenhab/internal/listener/RemoteopenhabStreamingDataListener.java b/bundles/org.openhab.binding.remoteopenhab/src/main/java/org/openhab/binding/remoteopenhab/internal/listener/RemoteopenhabStreamingDataListener.java index 969317107a8e3..d1d325a3ca0c4 100644 --- a/bundles/org.openhab.binding.remoteopenhab/src/main/java/org/openhab/binding/remoteopenhab/internal/listener/RemoteopenhabStreamingDataListener.java +++ b/bundles/org.openhab.binding.remoteopenhab/src/main/java/org/openhab/binding/remoteopenhab/internal/listener/RemoteopenhabStreamingDataListener.java @@ -24,10 +24,15 @@ public interface RemoteopenhabStreamingDataListener { /** - * The client successfully established a connection. + * The client successfully established a connection and received a first event. */ void onConnected(); + /** + * The client was disconnected. + */ + void onDisconnected(); + /** * An error message was published. */ diff --git a/bundles/org.openhab.binding.remoteopenhab/src/main/java/org/openhab/binding/remoteopenhab/internal/rest/RemoteopenhabRestClient.java b/bundles/org.openhab.binding.remoteopenhab/src/main/java/org/openhab/binding/remoteopenhab/internal/rest/RemoteopenhabRestClient.java index a6b7ca7ccd0d8..ebde376114ec6 100644 --- a/bundles/org.openhab.binding.remoteopenhab/src/main/java/org/openhab/binding/remoteopenhab/internal/rest/RemoteopenhabRestClient.java +++ b/bundles/org.openhab.binding.remoteopenhab/src/main/java/org/openhab/binding/remoteopenhab/internal/rest/RemoteopenhabRestClient.java @@ -90,6 +90,7 @@ public class RemoteopenhabRestClient { private String accessToken; private boolean trustedCertificate; private boolean connected; + private boolean completed; private @Nullable SseEventSource eventSource; private long lastEventTimestamp; @@ -237,10 +238,10 @@ public void start() { } } - public void stop() { + public void stop(boolean waitingForCompletion) { synchronized (startStopLock) { logger.debug("Closing EventSource"); - closeEventSource(0, TimeUnit.SECONDS); + closeEventSource(waitingForCompletion); logger.debug("EventSource stopped"); lastEventTimestamp = 0; } @@ -263,7 +264,7 @@ public boolean verify(@Nullable String hostname, @Nullable SSLSession session) { .register(new RemoteopenhabStreamingRequestFilter(accessToken)).build(); } SseEventSource eventSource = eventSourceFactory.newSource(client.target(restSseUrl)); - eventSource.register(this::onEvent, this::onError); + eventSource.register(this::onEvent, this::onError, this::onComplete); return eventSource; } @@ -279,7 +280,7 @@ private void reopenEventSource() { return; } - closeEventSource(10, TimeUnit.SECONDS); + closeEventSource(true); logger.debug("Opening new EventSource {}", url); SseEventSource localEventSource = createEventSource(url); @@ -288,12 +289,12 @@ private void reopenEventSource() { eventSource = localEventSource; } - private void closeEventSource(long timeout, TimeUnit timeoutUnit) { + private void closeEventSource(boolean waitingForCompletion) { SseEventSource localEventSource = eventSource; if (localEventSource != null) { - if (!localEventSource.isOpen()) { + if (!localEventSource.isOpen() || completed) { logger.debug("Existing EventSource is already closed"); - } else if (localEventSource.close(timeout, timeoutUnit)) { + } else if (localEventSource.close(waitingForCompletion ? 10 : 0, TimeUnit.SECONDS)) { logger.debug("Succesfully closed existing EventSource"); } else { logger.debug("Failed to close existing EventSource"); @@ -435,6 +436,12 @@ private void onEvent(InboundSseEvent inboundEvent) { } } + private void onComplete() { + logger.debug("Disconnected from streaming events"); + completed = true; + listeners.forEach(listener -> listener.onDisconnected()); + } + private void onError(Throwable error) { logger.debug("Error occurred while receiving events", error); listeners.forEach(listener -> listener.onError("Error occurred while receiving events"));