Skip to content

Commit

Permalink
[modbus] Fixed caching in ModbusPollerThingHandler
Browse files Browse the repository at this point in the history
Signed-off-by: Nagy Attila Gabor <mrbig@sneaker.hu>
  • Loading branch information
mrbig committed Jun 30, 2020
1 parent acfb9b5 commit 66225e8
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,15 @@ public class ModbusPollerThingHandler extends BaseBridgeHandler {
private class ReadCallbackDelegator
implements ModbusReadCallback, ModbusFailureCallback<ModbusReadRequestBlueprint> {

private volatile @Nullable AtomicStampedValue<AsyncModbusReadResult> lastResult;
private volatile @Nullable AtomicStampedValue<PollResult> lastResult;

@Override
public synchronized void handle(AsyncModbusReadResult result) {
public synchronized void handleResult(PollResult result) {
// Ignore all incoming data and errors if configuration is not correct
if (hasConfigurationError() || disposed) {
return;
}
if (config.getCacheMillis() >= 0) {
AtomicStampedValue<AsyncModbusReadResult> localLastResult = this.lastResult;
AtomicStampedValue<PollResult> localLastResult = this.lastResult;
if (localLastResult == null) {
this.lastResult = new AtomicStampedValue<>(System.currentTimeMillis(), result);
} else {
Expand All @@ -84,16 +83,25 @@ public synchronized void handle(AsyncModbusReadResult result) {
}
}
logger.debug("Thing {} received response {}", thing.getUID(), result);
childCallbacks.forEach(handler -> handler.onReadResult(result));
resetCommunicationError();
notifyChildren(result);
if (result.failure != null) {
Exception error = result.failure.getCause();
assert error != null;
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
String.format("Error with read: %s: %s", error.getClass().getName(), error.getMessage()));
} else {
resetCommunicationError();
}
}

@Override
public synchronized void handle(AsyncModbusReadResult result) {
handleResult(new PollResult(result));
}

@Override
public synchronized void handle(AsyncModbusFailure<ModbusReadRequestBlueprint> failure) {
Exception error = failure.getCause();
assert error != null;
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
String.format("Error with read: %s: %s", error.getClass().getName(), error.getMessage()));
handleResult(new PollResult(failure));
}

private void resetCommunicationError() {
Expand All @@ -115,11 +123,25 @@ public boolean updateChildrenWithOldData(long oldestStamp) {
return Optional.ofNullable(this.lastResult).map(result -> result.copyIfStampAfter(oldestStamp))
.map(result -> {
logger.debug("Thing {} reusing cached data: {}", thing.getUID(), result.getValue());
childCallbacks.forEach(handler -> handler.onReadResult(result.getValue()));
notifyChildren(result.getValue());
return true;
}).orElse(false);
}

private void notifyChildren(PollResult pollResult) {
@Nullable
AsyncModbusReadResult result = pollResult.result;
@Nullable
AsyncModbusFailure<ModbusReadRequestBlueprint> failure = pollResult.failure;
childCallbacks.forEach(handler -> {
if (result != null) {
handler.onReadResult(result);
} else if (failure != null) {
handler.handleReadError(failure);
}
});
}

/**
* Rest data caches
*/
Expand Down Expand Up @@ -155,6 +177,26 @@ public ModbusPollerReadRequest(ModbusPollerConfiguration config,
}
}

/**
* Immutable data object to cache the results of a poll request
*/
private class PollResult {
@Nullable
public final AsyncModbusReadResult result;
@Nullable
public final AsyncModbusFailure<ModbusReadRequestBlueprint> failure;

PollResult(AsyncModbusReadResult result) {
this.result = result;
this.failure = null;
}

PollResult(AsyncModbusFailure<ModbusReadRequestBlueprint> failure) {
this.result = null;
this.failure = failure;
}
}

private final Logger logger = LoggerFactory.getLogger(ModbusPollerThingHandler.class);

@NonNullByDefault({})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.*;
import static org.openhab.binding.modbus.internal.ModbusBindingConstantsInternal.*;

Expand Down Expand Up @@ -778,7 +778,7 @@ public void testRefreshOnData() throws InterruptedException {
builder -> builder.withConfiguration(dataConfig), bundleContext);
assertThat(dataHandler.getThing().getStatus(), is(equalTo(ThingStatus.ONLINE)));

verify(comms, never()).submitOneTimePoll(request, notNull(), notNull());
verify(comms, never()).submitOneTimePoll(eq(request), notNull(), notNull());
// Reset initial REFRESH commands to data thing channels from the Core
reset(poller.getHandler());
dataHandler.handleCommand(Mockito.mock(ChannelUID.class), RefreshType.REFRESH);
Expand Down

0 comments on commit 66225e8

Please sign in to comment.