Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[netatmo] Ensure to close all scheduled jobs #16056

Merged
merged 3 commits into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public void handleCommand(ChannelUID channelUID, Command command) {
logger.info("Unable to instantiate {}, expected scope {} is not active", clazz, expected);
}
} catch (SecurityException | ReflectiveOperationException e) {
logger.warn("Error invoking RestManager constructor for class {} : {}", clazz, e.getMessage());
logger.warn("Error invoking RestManager constructor for class {}: {}", clazz, e.getMessage());
}
}
return (T) managers.get(clazz);
Expand All @@ -319,7 +319,7 @@ public synchronized <T> T executeUri(URI uri, HttpMethod method, Class<T> clazz,
request.content(inputStreamContentProvider, contentType);
request.header(HttpHeader.ACCEPT, "application/json");
}
logger.trace(" -with payload : {} ", payload);
logger.trace(" -with payload: {} ", payload);
}

if (isLinked(requestCountChannelUID)) {
Expand All @@ -331,13 +331,13 @@ public synchronized <T> T executeUri(URI uri, HttpMethod method, Class<T> clazz,
}
updateState(requestCountChannelUID, new DecimalType(requestsTimestamps.size()));
}
logger.trace(" -with headers : {} ",
logger.trace(" -with headers: {} ",
String.join(", ", request.getHeaders().stream().map(HttpField::toString).toList()));
ContentResponse response = request.send();

Code statusCode = HttpStatus.getCode(response.getStatus());
String responseBody = new String(response.getContent(), StandardCharsets.UTF_8);
logger.trace(" -returned : code {} body {}", statusCode, responseBody);
logger.trace(" -returned: code {} body {}", statusCode, responseBody);

if (statusCode == Code.OK) {
return deserializer.deserialize(clazz, responseBody);
Expand All @@ -347,7 +347,7 @@ public synchronized <T> T executeUri(URI uri, HttpMethod method, Class<T> clazz,
try {
exception = new NetatmoException(deserializer.deserialize(ApiError.class, responseBody));
} catch (NetatmoException e) {
exception = new NetatmoException("Error deserializing error : %s".formatted(statusCode.getMessage()));
exception = new NetatmoException("Error deserializing error: %s".formatted(statusCode.getMessage()));
}
throw exception;
} catch (NetatmoException e) {
Expand All @@ -359,10 +359,10 @@ public synchronized <T> T executeUri(URI uri, HttpMethod method, Class<T> clazz,
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage());
throw new NetatmoException(String.format("%s: \"%s\"", e.getClass().getName(), e.getMessage()));
throw new NetatmoException("Request interrupted");
} catch (TimeoutException | ExecutionException e) {
if (retryCount > 0) {
logger.debug("Request timedout, retry counter : {}", retryCount);
logger.debug("Request timedout, retry counter: {}", retryCount);
return executeUri(uri, method, clazz, payload, contentType, retryCount - 1);
}
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, "@text/request-time-out");
Expand Down Expand Up @@ -427,7 +427,7 @@ public void identifyAllModulesAndApplyAction(BiFunction<NAModule, ThingUID, Opti
});
}
} catch (NetatmoException e) {
logger.warn("Error while identifying all modules : {}", e.getMessage());
logger.warn("Error while identifying all modules: {}", e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import org.eclipse.jdt.annotation.NonNullByDefault;
Expand All @@ -30,6 +29,7 @@
import org.openhab.binding.netatmo.internal.handler.capability.Capability;
import org.openhab.binding.netatmo.internal.handler.capability.CapabilityMap;
import org.openhab.binding.netatmo.internal.handler.capability.HomeCapability;
import org.openhab.binding.netatmo.internal.handler.capability.ParentUpdateCapability;
import org.openhab.binding.netatmo.internal.handler.capability.RefreshCapability;
import org.openhab.binding.netatmo.internal.handler.capability.RestCapability;
import org.openhab.core.config.core.Configuration;
Expand Down Expand Up @@ -220,37 +220,22 @@ default void commonInitialize() {
setThingStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_UNINITIALIZED, null);
} else if (!ThingStatus.ONLINE.equals(bridge.getStatus())) {
setThingStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE, null);
removeRefreshCapability();
getCapabilities().remove(RefreshCapability.class);
getCapabilities().remove(ParentUpdateCapability.class);
} else {
setThingStatus(ThingStatus.UNKNOWN, ThingStatusDetail.NONE, null);
setRefreshCapability();
getScheduler().schedule(() -> {
CommonInterface bridgeHandler = getBridgeHandler();
if (bridgeHandler != null) {
bridgeHandler.expireData();
}
}, 1, TimeUnit.SECONDS);
if (ModuleType.ACCOUNT.equals(getModuleType().getBridge())) {
NAThingConfiguration config = getThing().getConfiguration().as(NAThingConfiguration.class);
getCapabilities().put(new RefreshCapability(this, config.refreshInterval));
}
getCapabilities().put(new ParentUpdateCapability(this));
}
}

default ModuleType getModuleType() {
return ModuleType.from(getThing().getThingTypeUID());
}

default void setRefreshCapability() {
if (ModuleType.ACCOUNT.equals(getModuleType().getBridge())) {
NAThingConfiguration config = getThing().getConfiguration().as(NAThingConfiguration.class);
getCapabilities().put(new RefreshCapability(this, getScheduler(), config.refreshInterval));
}
}

default void removeRefreshCapability() {
Capability refreshCap = getCapabilities().remove(RefreshCapability.class);
if (refreshCap != null) {
refreshCap.dispose();
}
}

default void commonDispose() {
getCapabilities().values().forEach(Capability::dispose);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.ConcurrentHashMap;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;

/**
* {@link CapabilityMap} is a specialized Map designed to store capabilities
Expand All @@ -40,4 +41,12 @@ public <T extends Capability> Optional<T> get(Class<T> clazz) {
T cap = (T) super.get(clazz);
return Optional.ofNullable(cap);
}

public <T extends Capability> void remove(Class<?> clazz) {
@Nullable
Capability cap = super.remove(clazz);
if (cap != null) {
cap.dispose();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Copyright (c) 2010-2023 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.netatmo.internal.handler.capability;

import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.binding.netatmo.internal.handler.CommonInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link ParentUpdateCapability} is the class used to request data update upon initialization of a module
*
* @author Gaël L'hopital - Initial contribution
*
*/
@NonNullByDefault
public class ParentUpdateCapability extends Capability {
private static final int DEFAULT_DELAY_S = 2;

private final Logger logger = LoggerFactory.getLogger(ParentUpdateCapability.class);
private Optional<ScheduledFuture<?>> job = Optional.empty();

public ParentUpdateCapability(CommonInterface handler) {
super(handler);
}

@Override
public void initialize() {
job = Optional.of(handler.getScheduler().schedule(() -> {
logger.debug("Requesting parents data update for Thing {}", handler.getId());
CommonInterface bridgeHandler = handler.getBridgeHandler();
if (bridgeHandler != null) {
bridgeHandler.expireData();
}
}, DEFAULT_DELAY_S, TimeUnit.SECONDS));
}

@Override
public void dispose() {
job.ifPresent(j -> j.cancel(true));
job = Optional.empty();
super.dispose();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

Expand All @@ -42,18 +41,20 @@ public class RefreshCapability extends Capability {
private static final Duration OFFLINE_INTERVAL = Duration.of(15, MINUTES);

private final Logger logger = LoggerFactory.getLogger(RefreshCapability.class);
private final ScheduledExecutorService scheduler;

private Duration dataValidity;
private Instant dataTimeStamp = Instant.now();
private Instant dataTimeStamp0 = Instant.MIN;
private Optional<ScheduledFuture<?>> refreshJob = Optional.empty();
private boolean refreshConfigured;

public RefreshCapability(CommonInterface handler, ScheduledExecutorService scheduler, int refreshInterval) {
public RefreshCapability(CommonInterface handler, int refreshInterval) {
super(handler);
this.scheduler = scheduler;
this.dataValidity = Duration.ofSeconds(Math.max(0, refreshInterval));
}

@Override
public void initialize() {
this.refreshConfigured = !probing();
freeJobAndReschedule(2);
}
Expand Down Expand Up @@ -109,16 +110,16 @@ protected void updateNAThing(NAThing newData) {
refreshConfigured = true;
logger.debug("Data validity period identified to be {}", dataValidity);
} else {
logger.debug("Data validity period not yet found - data timestamp unchanged");
logger.debug("Data validity period not yet found, data timestamp unchanged");
}
}
dataTimeStamp = tsInstant;
});
}

private void freeJobAndReschedule(long delay) {
refreshJob.ifPresent(job -> job.cancel(false));
refreshJob = Optional
.ofNullable(delay == 0 ? null : scheduler.schedule(() -> proceedWithUpdate(), delay, TimeUnit.SECONDS));
refreshJob.ifPresent(job -> job.cancel(true));
refreshJob = Optional.ofNullable(delay == 0 ? null
: handler.getScheduler().schedule(() -> proceedWithUpdate(), delay, TimeUnit.SECONDS));
}
}