Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[neohub] Avoid too frequent requests to hub #15743

Merged
merged 5 commits into from
Dec 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.openhab.binding.neohub.internal.NeoHubBindingConstants.*;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
Expand Down Expand Up @@ -61,6 +62,8 @@ public class NeoHubHandler extends BaseBridgeHandler {

private static final String SEE_README = "See documentation chapter \"Connection Refused Errors\"";
private static final int MAX_FAILED_SEND_ATTEMPTS = 2;
private static final Duration MIN_RESTART_DELAY = Duration.ofSeconds(10);
private static final Duration MAX_RESTART_DELAY = Duration.ofHours(1);

private final Logger logger = LoggerFactory.getLogger(NeoHubHandler.class);

Expand Down Expand Up @@ -91,6 +94,8 @@ private ApiVersion(String label) {
private ApiVersion apiVersion = ApiVersion.LEGACY;
private boolean isApiOnline = false;
private int failedSendAttempts = 0;
private Duration restartDelay = Duration.from(MIN_RESTART_DELAY);
private @Nullable ScheduledFuture<?> restartTask;

public NeoHubHandler(Bridge bridge, WebSocketFactory webSocketFactory) {
super(bridge);
Expand Down Expand Up @@ -148,21 +153,12 @@ public void initialize() {
logger.debug("hub '{}' preferLegacyApi={}", getThing().getUID(), config.preferLegacyApi);
}

// create a web or TCP socket based on the port number in the configuration
NeoHubSocketBase socket;
try {
if (config.useWebSocket) {
socket = new NeoHubWebSocket(config, webSocketFactory, thing.getUID());
} else {
socket = new NeoHubSocket(config, thing.getUID().getAsString());
}
} catch (IOException e) {
logger.debug("\"hub '{}' error creating web/tcp socket: '{}'", getThing().getUID(), e.getMessage());
this.config = config;
NeoHubSocketBase socket = createSocket();
if (socket == null) {
return;
}

this.socket = socket;
this.config = config;

/*
* Try to 'ping' the hub, and if there is a 'connection refused', it is probably due to the mobile App |
Expand Down Expand Up @@ -206,10 +202,39 @@ public void initialize() {
startFastPollingBurst();
}

/**
* Create a web or TCP socket based on the configuration setting
*/
private @Nullable NeoHubSocketBase createSocket() {
NeoHubConfiguration config = this.config;
if (config == null) {
logger.debug("\"hub '{}' configuration is null", getThing().getUID());
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR);
} else {
try {
if (config.useWebSocket) {
return new NeoHubWebSocket(config, webSocketFactory, thing.getUID());
} else {
return new NeoHubSocket(config, thing.getUID().getAsString());
}
} catch (IOException e) {
logger.debug("\"hub '{}' error creating web/tcp socket: '{}'", getThing().getUID(), e.getMessage());
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
}
}
return null;
}

@Override
public void dispose() {
if (logger.isDebugEnabled()) {
logger.debug("hub '{}' stop background polling..", getThing().getUID());
logger.debug("hub '{}' shutting down..", getThing().getUID());
}

closeSocket();
ScheduledFuture<?> restartTask = this.restartTask;
if (restartTask != null) {
restartTask.cancel(true);
}

// clean up the lazy polling scheduler
Expand All @@ -225,14 +250,16 @@ public void dispose() {
fast.cancel(true);
this.fastPollingScheduler = null;
}
}

private void closeSocket() {
NeoHubSocketBase socket = this.socket;
this.socket = null;
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
}
this.socket = null;
}
}

Expand Down Expand Up @@ -276,8 +303,7 @@ public synchronized NeoHubReturnResult toNeoHubSendChannelValue(String commandSt
protected @Nullable NeoHubAbstractDeviceData fromNeoHubGetDeviceData() {
NeoHubSocketBase socket = this.socket;

if (socket == null || config == null) {
logger.warn(MSG_HUB_CONFIG, getThing().getUID());
if (socket == null) {
return null;
}

Expand Down Expand Up @@ -309,6 +335,7 @@ public synchronized NeoHubReturnResult toNeoHubSendChannelValue(String commandSt

if (getThing().getStatus() != ThingStatus.ONLINE) {
updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
restartDelay = Duration.from(MIN_RESTART_DELAY);
}

// check if we also need to discard and update systemData
Expand Down Expand Up @@ -340,8 +367,24 @@ public synchronized NeoHubReturnResult toNeoHubSendChannelValue(String commandSt
} catch (IOException | NeoHubException e) {
logger.warn(MSG_FMT_DEVICE_POLL_ERR, getThing().getUID(), e.getMessage());
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
return null;
scheduleRestart();
}
return null;
}

private synchronized void scheduleRestart() {
closeSocket();
restartTask = scheduler.schedule(() -> {
NeoHubSocketBase socket = createSocket();
this.socket = socket;
if (!Thread.interrupted() && socket == null) { // keep trying..
restartDelay = restartDelay.plus(restartDelay);
if (restartDelay.compareTo(MAX_RESTART_DELAY) > 0) {
restartDelay = Duration.from(MAX_RESTART_DELAY);
}
scheduleRestart();
}
}, restartDelay.toSeconds(), TimeUnit.SECONDS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public synchronized String sendMessage(final String requestJson) throws IOExcept
IOException caughtException = null;
StringBuilder builder = new StringBuilder();

throttle();
try (Socket socket = new Socket()) {
int port = config.portNumber > 0 ? config.portNumber : NeoHubBindingConstants.PORT_TCP;
socket.connect(new InetSocketAddress(config.hostName, port), config.socketTimeout * 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;

import org.eclipse.jdt.annotation.NonNullByDefault;

Expand All @@ -29,6 +32,9 @@ public abstract class NeoHubSocketBase implements Closeable {
protected final NeoHubConfiguration config;
protected final String hubId;

private static final int REQUEST_INTERVAL_MILLISECS = 1000;
private Optional<Instant> lastRequestTime = Optional.empty();

public NeoHubSocketBase(NeoHubConfiguration config, String hubId) {
this.config = config;
this.hubId = hubId;
Expand All @@ -43,4 +49,24 @@ public NeoHubSocketBase(NeoHubConfiguration config, String hubId) {
* @throws NeoHubException if the communication returned a response but the response was not valid JSON
*/
public abstract String sendMessage(final String requestJson) throws IOException, NeoHubException;

/**
* Method for throttling requests to prevent overloading the hub.
* <p>
* The NeoHub can get confused if, while it is uploading data to the cloud, it also receives too many local
* requests, so this method throttles the requests to one per REQUEST_INTERVAL_MILLISECS maximum.
*
* @throws NeoHubException if the wait is interrupted
*/
protected synchronized void throttle() throws NeoHubException {
try {
Instant now = Instant.now();
long delay = lastRequestTime
.map(t -> Math.max(0, Duration.between(now, t).toMillis() + REQUEST_INTERVAL_MILLISECS)).orElse(0L);
lastRequestTime = Optional.of(now.plusMillis(delay));
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new NeoHubException("Throttle sleep interrupted", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Instant;
import java.util.concurrent.ExecutionException;

import org.eclipse.jdt.annotation.NonNullByDefault;
Expand Down Expand Up @@ -116,9 +117,9 @@ private void startSession() throws IOException {
*/
private void closeSession() {
Session session = this.session;
this.session = null;
if (session != null) {
session.close();
this.session = null;
}
}

Expand Down Expand Up @@ -172,19 +173,19 @@ public synchronized String sendMessage(final String requestJson) throws IOExcept
responsePending = true;

IOException caughtException = null;
throttle();
try {
// send the request
logger.debug("hub '{}' sending characters:{}", hubId, requestOuter.length());
session.getRemote().sendString(requestOuter);
logger.trace("hub '{}' sent:{}", hubId, requestOuter);

// sleep and loop until we get a response or the socket is closed
int sleepRemainingMilliseconds = config.socketTimeout * 1000;
// sleep and loop until we get a response, the socket is closed, or it times out
Instant timeout = Instant.now().plusSeconds(config.socketTimeout);
while (responsePending) {
try {
Thread.sleep(SLEEP_MILLISECONDS);
sleepRemainingMilliseconds = sleepRemainingMilliseconds - SLEEP_MILLISECONDS;
if (sleepRemainingMilliseconds <= 0) {
if (Instant.now().isAfter(timeout)) {
throw new IOException("Read timed out");
}
} catch (InterruptedException e) {
Expand All @@ -195,6 +196,9 @@ public synchronized String sendMessage(final String requestJson) throws IOExcept
caughtException = e;
}

caughtException = caughtException != null ? caughtException
: this.session == null ? new IOException("WebSocket session closed") : null;

logger.debug("hub '{}' received characters:{}", hubId, responseOuter.length());
logger.trace("hub '{}' received:{}", hubId, responseOuter);

Expand Down