Skip to content

Commit

Permalink
[remoteopenhab] Detect a remote server shutdown and reconnect properly (
Browse files Browse the repository at this point in the history
openhab#10060)

when the remote server is alive again

Related to openhab#9680

Signed-off-by: Laurent Garnier <lg.hc@free.fr>
  • Loading branch information
lolodomo authored and thinkingstone committed Nov 7, 2021
1 parent e330f3d commit 819d757
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void initialize() {
@Override
public void dispose() {
logger.debug("Disposing remote openHAB handler for bridge {}", getThing().getUID());
stopStreamingUpdates();
stopStreamingUpdates(false);
stopCheckConnectionJob();
channelsLastStates.clear();
}
Expand Down Expand Up @@ -380,7 +380,8 @@ private void startCheckConnectionJob(int accessibilityInterval, int aliveInterva
if (localCheckConnectionJob == null || localCheckConnectionJob.isCancelled()) {
checkConnectionJob = scheduler.scheduleWithFixedDelay(() -> {
long millisSinceLastEvent = System.currentTimeMillis() - restClient.getLastEventTimestamp();
if (aliveInterval == 0 || restClient.getLastEventTimestamp() == 0) {
if (getThing().getStatus() != ThingStatus.ONLINE || aliveInterval == 0
|| restClient.getLastEventTimestamp() == 0) {
logger.debug("Time to check server accessibility");
checkConnection();
} else if (millisSinceLastEvent > (aliveInterval * 60000)) {
Expand Down Expand Up @@ -421,8 +422,12 @@ private void startStreamingUpdates() {
}

private void stopStreamingUpdates() {
stopStreamingUpdates(true);
}

private void stopStreamingUpdates(boolean waitingForCompletion) {
synchronized (restClient) {
restClient.stop();
restClient.stop(waitingForCompletion);
restClient.removeStreamingDataListener(this);
restClient.removeItemsDataListener(this);
}
Expand All @@ -437,6 +442,11 @@ public void onConnected() {
updateStatus(ThingStatus.ONLINE);
}

@Override
public void onDisconnected() {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, "Disconected from the remote server");
}

@Override
public void onError(String message) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@
public interface RemoteopenhabStreamingDataListener {

/**
* The client successfully established a connection.
* The client successfully established a connection and received a first event.
*/
void onConnected();

/**
* The client was disconnected.
*/
void onDisconnected();

/**
* An error message was published.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public class RemoteopenhabRestClient {
private String accessToken;
private boolean trustedCertificate;
private boolean connected;
private boolean completed;

private @Nullable SseEventSource eventSource;
private long lastEventTimestamp;
Expand Down Expand Up @@ -237,10 +238,10 @@ public void start() {
}
}

public void stop() {
public void stop(boolean waitingForCompletion) {
synchronized (startStopLock) {
logger.debug("Closing EventSource");
closeEventSource(0, TimeUnit.SECONDS);
closeEventSource(waitingForCompletion);
logger.debug("EventSource stopped");
lastEventTimestamp = 0;
}
Expand All @@ -263,7 +264,7 @@ public boolean verify(@Nullable String hostname, @Nullable SSLSession session) {
.register(new RemoteopenhabStreamingRequestFilter(accessToken)).build();
}
SseEventSource eventSource = eventSourceFactory.newSource(client.target(restSseUrl));
eventSource.register(this::onEvent, this::onError);
eventSource.register(this::onEvent, this::onError, this::onComplete);
return eventSource;
}

Expand All @@ -279,7 +280,7 @@ private void reopenEventSource() {
return;
}

closeEventSource(10, TimeUnit.SECONDS);
closeEventSource(true);

logger.debug("Opening new EventSource {}", url);
SseEventSource localEventSource = createEventSource(url);
Expand All @@ -288,12 +289,12 @@ private void reopenEventSource() {
eventSource = localEventSource;
}

private void closeEventSource(long timeout, TimeUnit timeoutUnit) {
private void closeEventSource(boolean waitingForCompletion) {
SseEventSource localEventSource = eventSource;
if (localEventSource != null) {
if (!localEventSource.isOpen()) {
if (!localEventSource.isOpen() || completed) {
logger.debug("Existing EventSource is already closed");
} else if (localEventSource.close(timeout, timeoutUnit)) {
} else if (localEventSource.close(waitingForCompletion ? 10 : 0, TimeUnit.SECONDS)) {
logger.debug("Succesfully closed existing EventSource");
} else {
logger.debug("Failed to close existing EventSource");
Expand Down Expand Up @@ -435,6 +436,12 @@ private void onEvent(InboundSseEvent inboundEvent) {
}
}

private void onComplete() {
logger.debug("Disconnected from streaming events");
completed = true;
listeners.forEach(listener -> listener.onDisconnected());
}

private void onError(Throwable error) {
logger.debug("Error occurred while receiving events", error);
listeners.forEach(listener -> listener.onError("Error occurred while receiving events"));
Expand Down

0 comments on commit 819d757

Please sign in to comment.