Skip to content

Commit

Permalink
[novafinedust] Use fire and forget commands to configure device (#10210)
Browse files Browse the repository at this point in the history
Since the device does not follow its own protocol, we do not evaluate its
replies to our configuration commands but rather do a fire and forget.

Signed-off-by: Stefan Triller <github@stefantriller.de>
  • Loading branch information
t2000 authored Feb 22, 2021
1 parent a30b5df commit fe7b91f
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.TooManyListenersException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -62,6 +63,7 @@ public class SDS011Handler extends BaseThingHandler {

private @Nullable ScheduledFuture<?> dataReadJob;
private @Nullable ScheduledFuture<?> connectionMonitor;
private @Nullable Future<?> initJob;
private @Nullable ScheduledFuture<?> retryInitJob;

private ZonedDateTime lastCommunication = ZonedDateTime.now();
Expand Down Expand Up @@ -115,10 +117,10 @@ public void initialize() {

if (config.reporting) {
timeBetweenDataShouldArrive = Duration.ofMinutes(config.reportingInterval);
scheduler.submit(() -> initializeCommunicator(WorkMode.REPORTING, timeBetweenDataShouldArrive));
initJob = scheduler.submit(() -> initializeCommunicator(WorkMode.REPORTING, timeBetweenDataShouldArrive));
} else {
timeBetweenDataShouldArrive = Duration.ofSeconds(config.pollingInterval);
scheduler.submit(() -> initializeCommunicator(WorkMode.POLLING, timeBetweenDataShouldArrive));
initJob = scheduler.submit(() -> initializeCommunicator(WorkMode.POLLING, timeBetweenDataShouldArrive));
}
}

Expand All @@ -130,66 +132,45 @@ private void initializeCommunicator(WorkMode mode, Duration interval) {
return;
}

boolean initSuccessful = false;
int retryInit = 3;
int retryCount = 0;
// sometimes the device is a little difficult and needs multiple configuration attempts
while (!initSuccessful && retryCount < retryInit) {
logger.trace("Trying to initialize device attempt={}", retryCount);
initSuccessful = doInit(localCommunicator, mode, interval);
retryCount++;
}

if (initSuccessful) {
lastCommunication = ZonedDateTime.now();
updateStatus(ThingStatus.ONLINE);
logger.trace("Trying to initialize device");
doInit(localCommunicator, mode, interval);

if (mode == WorkMode.POLLING) {
dataReadJob = scheduler.scheduleWithFixedDelay(() -> {
try {
localCommunicator.requestSensorData();
} catch (IOException e) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
"Cannot query data from device");
}
}, 2, config.pollingInterval, TimeUnit.SECONDS);
} else {
// start a job that reads the port until data arrives
int reportingReadStartDelay = 10;
int startReadBeforeDataArrives = 5;
long readReportedDataInterval = (config.reportingInterval * 60) - reportingReadStartDelay
- startReadBeforeDataArrives;
logger.trace("Scheduling job to receive reported values");
dataReadJob = scheduler.scheduleWithFixedDelay(() -> {
try {
localCommunicator.readSensorData();
} catch (IOException e) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
"Cannot query data from device, because: " + e.getMessage());
}
}, reportingReadStartDelay, readReportedDataInterval, TimeUnit.SECONDS);
}
lastCommunication = ZonedDateTime.now();

Duration connectionMonitorStartDelay = timeBetweenDataShouldArrive
.plus(CONNECTION_MONITOR_START_DELAY_OFFSET);
connectionMonitor = scheduler.scheduleWithFixedDelay(this::verifyIfStillConnected,
connectionMonitorStartDelay.getSeconds(), timeBetweenDataShouldArrive.getSeconds(),
TimeUnit.SECONDS);
if (mode == WorkMode.POLLING) {
dataReadJob = scheduler.scheduleWithFixedDelay(() -> {
try {
localCommunicator.requestSensorData();
} catch (IOException e) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
"Cannot query data from device");
}
}, 2, config.pollingInterval, TimeUnit.SECONDS);
} else {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
"Commands and replies from the device don't seem to match");
logger.debug(
"Could not configure sensor -> setting Thing to OFFLINE, disposing the handler and reschedule initialize in {} seconds",
RETRY_INIT_DELAY);
doDispose(false);
retryInitJob = scheduler.schedule(this::initialize, RETRY_INIT_DELAY.getSeconds(), TimeUnit.SECONDS);
// start a job that reads the port until data arrives
int reportingReadStartDelay = 10;
int startReadBeforeDataArrives = 5;
long readReportedDataInterval = (config.reportingInterval * 60) - reportingReadStartDelay
- startReadBeforeDataArrives;
logger.trace("Scheduling job to receive reported values");
dataReadJob = scheduler.scheduleWithFixedDelay(() -> {
try {
localCommunicator.readSensorData();
} catch (IOException e) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
"Cannot query data from device, because: " + e.getMessage());
}
}, reportingReadStartDelay, readReportedDataInterval, TimeUnit.SECONDS);
}

Duration connectionMonitorStartDelay = timeBetweenDataShouldArrive.plus(CONNECTION_MONITOR_START_DELAY_OFFSET);
connectionMonitor = scheduler.scheduleWithFixedDelay(this::verifyIfStillConnected,
connectionMonitorStartDelay.getSeconds(), timeBetweenDataShouldArrive.getSeconds(), TimeUnit.SECONDS);
}

private boolean doInit(SDS011Communicator localCommunicator, WorkMode mode, Duration interval) {
boolean initSuccessful = false;
private void doInit(SDS011Communicator localCommunicator, WorkMode mode, Duration interval) {
try {
initSuccessful = localCommunicator.initialize(mode, interval);
localCommunicator.initialize(mode, interval);
} catch (final IOException ex) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, "I/O error!");
} catch (PortInUseException e) {
Expand All @@ -201,7 +182,6 @@ private boolean doInit(SDS011Communicator localCommunicator, WorkMode mode, Dura
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
"Cannot set serial port parameters");
}
return initSuccessful;
}

private boolean validateConfiguration() {
Expand Down Expand Up @@ -243,6 +223,12 @@ private void doDispose(boolean sendDeviceToSleep) {
this.connectionMonitor = null;
}

Future<?> localInitJob = this.initJob;
if (localInitJob != null) {
localInitJob.cancel(true);
this.initJob = null;
}

ScheduledFuture<?> localRetryOpenPortJob = this.retryInitJob;
if (localRetryOpenPortJob != null) {
localRetryOpenPortJob.cancel(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,8 @@
import org.openhab.binding.novafinedust.internal.SDS011Handler;
import org.openhab.binding.novafinedust.internal.sds011protocol.messages.CommandMessage;
import org.openhab.binding.novafinedust.internal.sds011protocol.messages.Constants;
import org.openhab.binding.novafinedust.internal.sds011protocol.messages.ModeReply;
import org.openhab.binding.novafinedust.internal.sds011protocol.messages.SensorFirmwareReply;
import org.openhab.binding.novafinedust.internal.sds011protocol.messages.SensorMeasuredDataReply;
import org.openhab.binding.novafinedust.internal.sds011protocol.messages.SensorReply;
import org.openhab.binding.novafinedust.internal.sds011protocol.messages.SleepReply;
import org.openhab.binding.novafinedust.internal.sds011protocol.messages.WorkingPeriodReply;
import org.openhab.core.io.transport.serial.PortInUseException;
import org.openhab.core.io.transport.serial.SerialPort;
import org.openhab.core.io.transport.serial.SerialPortIdentifier;
Expand All @@ -52,7 +48,7 @@
@NonNullByDefault
public class SDS011Communicator {

private static final int MAX_SENDOR_REPORTINGS_UNTIL_EXPECTED_REPLY = 20;
private static final int MAX_READ_UNTIL_SENSOR_DATA = 6; // at least 6 because we send 5 configuration commands

private final Logger logger = LoggerFactory.getLogger(SDS011Communicator.class);

Expand Down Expand Up @@ -82,9 +78,8 @@ public SDS011Communicator(SDS011Handler thingHandler, SerialPortIdentifier portI
* @throws IOException
* @throws UnsupportedCommOperationException
*/
public boolean initialize(WorkMode mode, Duration interval)
public void initialize(WorkMode mode, Duration interval)
throws PortInUseException, TooManyListenersException, IOException, UnsupportedCommOperationException {
boolean initSuccessful = true;

logger.trace("Initializing with mode={}, interval={}", mode, interval);

Expand All @@ -102,30 +97,28 @@ public boolean initialize(WorkMode mode, Duration interval)
logger.trace("Input and Outputstream opened for the port");

// wake up the device
initSuccessful &= sendSleep(false);
logger.trace("Wake up call done, initSuccessful={}", initSuccessful);
initSuccessful &= getFirmware();
logger.trace("Firmware requested, initSuccessful={}", initSuccessful);
sendSleep(false);
logger.trace("Wake up call done");
getFirmware();
logger.trace("Firmware requested");

if (mode == WorkMode.POLLING) {
initSuccessful &= setMode(WorkMode.POLLING);
logger.trace("Polling mode set, initSuccessful={}", initSuccessful);
initSuccessful &= setWorkingPeriod((byte) 0);
logger.trace("Working period for polling set, initSuccessful={}", initSuccessful);
setMode(WorkMode.POLLING);
logger.trace("Polling mode set");
setWorkingPeriod((byte) 0);
logger.trace("Working period for polling set");
} else {
// reporting
initSuccessful &= setWorkingPeriod((byte) interval.toMinutes());
logger.trace("Working period for reporting set, initSuccessful={}", initSuccessful);
initSuccessful &= setMode(WorkMode.REPORTING);
logger.trace("Reporting mode set, initSuccessful={}", initSuccessful);
setWorkingPeriod((byte) interval.toMinutes());
logger.trace("Working period for reporting set");
setMode(WorkMode.REPORTING);
logger.trace("Reporting mode set");
}

this.serialPort = localSerialPort;

return initSuccessful;
}

private @Nullable SensorReply sendCommand(CommandMessage message) throws IOException {
private void sendCommand(CommandMessage message) throws IOException {
byte[] commandData = message.getBytes();
if (logger.isDebugEnabled()) {
logger.debug("Will send command: {} ({})", HexUtils.bytesToHex(commandData), Arrays.toString(commandData));
Expand All @@ -139,23 +132,12 @@ public boolean initialize(WorkMode mode, Duration interval)
}

try {
// Give the sensor some time to handle the command
// Give the sensor some time to handle the command before doing something else with it
Thread.sleep(500);
} catch (InterruptedException e) {
logger.warn("Problem while waiting for reading a reply to our command.");
logger.warn("Interrupted while waiting after sending command={}", message);
Thread.currentThread().interrupt();
}
SensorReply reply = readReply();
// in case there is still another reporting active, we want to discard the sensor data and read the reply to our
// command again, this might happen more often in case the sensor has buffered some data
for (int i = 0; i < MAX_SENDOR_REPORTINGS_UNTIL_EXPECTED_REPLY; i++) {
if (reply instanceof SensorMeasuredDataReply) {
reply = readReply();
} else {
break;
}
}
return reply;
}

private void write(byte[] commandData) throws IOException {
Expand All @@ -166,79 +148,46 @@ private void write(byte[] commandData) throws IOException {
}
}

private boolean setWorkingPeriod(byte period) throws IOException {
private void setWorkingPeriod(byte period) throws IOException {
CommandMessage m = new CommandMessage(Command.WORKING_PERIOD, new byte[] { Constants.SET_ACTION, period });
logger.debug("Sending work period: {}", period);
SensorReply reply = sendCommand(m);
logger.debug("Got reply to setWorkingPeriod command: {}", reply);
if (reply instanceof WorkingPeriodReply) {
WorkingPeriodReply wpReply = (WorkingPeriodReply) reply;
if (wpReply.getPeriod() == period && wpReply.getActionType() == Constants.SET_ACTION) {
return true;
}
}
return false;
sendCommand(m);
}

private boolean setMode(WorkMode workMode) throws IOException {
private void setMode(WorkMode workMode) throws IOException {
byte haveToRequestData = 0;
if (workMode == WorkMode.POLLING) {
haveToRequestData = 1;
}

CommandMessage m = new CommandMessage(Command.MODE, new byte[] { Constants.SET_ACTION, haveToRequestData });
logger.debug("Sending mode: {}", workMode);
SensorReply reply = sendCommand(m);
logger.debug("Got reply to setMode command: {}", reply);
if (reply instanceof ModeReply) {
ModeReply mr = (ModeReply) reply;
if (mr.getActionType() == Constants.SET_ACTION && mr.getMode() == workMode) {
return true;
}
}
return false;
sendCommand(m);
}

private boolean sendSleep(boolean doSleep) throws IOException {
private void sendSleep(boolean doSleep) throws IOException {
byte payload = (byte) 1;
if (doSleep) {
payload = (byte) 0;
}

CommandMessage m = new CommandMessage(Command.SLEEP, new byte[] { Constants.SET_ACTION, payload });
logger.debug("Sending doSleep: {}", doSleep);
SensorReply reply = sendCommand(m);
logger.debug("Got reply to sendSleep command: {}", reply);
sendCommand(m);

// as it turns out, the protocol doesn't work as described: sometimes the device just wakes up without replying
// to us. Hence we should not wait for a reply, but just force to wake it up to then send out our configuration
// commands
if (!doSleep) {
// sometimes the sensor does not wakeup on the first attempt, thus we try again
for (int i = 0; reply == null && i < 3; i++) {
reply = sendCommand(m);
logger.debug("Got reply to sendSleep command after retry#{}: {}", i + 1, reply);
}
sendCommand(m);
}

if (reply instanceof SleepReply) {
SleepReply sr = (SleepReply) reply;
if (sr.getActionType() == Constants.SET_ACTION && sr.getSleep() == payload) {
return true;
}
}
return false;
}

private boolean getFirmware() throws IOException {
private void getFirmware() throws IOException {
CommandMessage m = new CommandMessage(Command.FIRMWARE, new byte[] {});
logger.debug("Sending get firmware request");
SensorReply reply = sendCommand(m);
logger.debug("Got reply to getFirmware command: {}", reply);

if (reply instanceof SensorFirmwareReply) {
SensorFirmwareReply fwReply = (SensorFirmwareReply) reply;
thingHandler.setFirmware(fwReply.getFirmware());
return true;
}
return false;
sendCommand(m);
}

/**
Expand All @@ -256,7 +205,7 @@ public void requestSensorData() throws IOException {
try {
Thread.sleep(200); // give the device some time to handle the command
} catch (InterruptedException e) {
logger.warn("Interrupted while waiting before reading a reply to our rquest data command.");
logger.warn("Interrupted while waiting before reading a reply to our request data command.");
Thread.currentThread().interrupt();
}
readSensorData();
Expand All @@ -271,7 +220,7 @@ public void requestSensorData() throws IOException {
if (localInpuStream != null) {
logger.trace("Reading for reply until first byte is found");
while ((b = localInpuStream.read()) != Constants.MESSAGE_START_AS_INT) {
logger.debug("Trying to find first reply byte now...");
// logger.trace("Trying to find first reply byte now...");
}
readBuffer[0] = (byte) b;
int remainingBytesRead = localInpuStream.read(readBuffer, 1, Constants.REPLY_LENGTH - 1);
Expand All @@ -286,16 +235,26 @@ public void requestSensorData() throws IOException {

public void readSensorData() throws IOException {
logger.trace("readSensorData() called");

boolean foundSensorData = doRead();
for (int i = 0; !foundSensorData && i < MAX_READ_UNTIL_SENSOR_DATA; i++) {
foundSensorData = doRead();
}
}

private boolean doRead() throws IOException {
SensorReply reply = readReply();
logger.trace("readSensorData(): Read reply={}", reply);
logger.trace("doRead(): Read reply={}", reply);
if (reply instanceof SensorMeasuredDataReply) {
SensorMeasuredDataReply sensorData = (SensorMeasuredDataReply) reply;
logger.trace("We received sensor data");
if (sensorData.isValidData()) {
logger.trace("Sensor data is valid => updating channels");
thingHandler.updateChannels(sensorData);
return true;
}
}
return false;
}

/**
Expand Down

0 comments on commit fe7b91f

Please sign in to comment.