diff --git a/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/Connection.java b/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/Connection.java index f0958e41a9d6d..dc80b0994d8f4 100644 --- a/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/Connection.java +++ b/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/Connection.java @@ -147,14 +147,18 @@ public synchronized byte[] io(VeluxBridgeHandler bridgeInstance, byte[] request) } catch (IOException ioe) { logger.info("io() on {}: Exception occurred during I/O: {}.", host, ioe.getMessage()); lastIOE = ioe; - // Error Retries with Exponential Backoff - long waitTime = ((long) Math.pow(2, retryCount) - * bridgeInstance.veluxBridgeConfiguration().timeoutMsecs); - logger.trace("io() on {}: wait time {} msecs.", host, waitTime); - try { - Thread.sleep(waitTime); - } catch (InterruptedException ie) { - logger.trace("io() on {}: wait interrupted.", host); + if (bridgeInstance.isDisposing()) { + break; + } else { + // Error Retries with Exponential Backoff + long waitTime = ((long) Math.pow(2, retryCount) + * bridgeInstance.veluxBridgeConfiguration().timeoutMsecs); + logger.trace("io() on {}: wait time {} msecs.", host, waitTime); + try { + Thread.sleep(waitTime); + } catch (InterruptedException ie) { + logger.trace("io() on {}: wait interrupted.", host); + } } } } while (retryCount++ < bridgeInstance.veluxBridgeConfiguration().retries); diff --git a/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/DataInputStreamWithTimeout.java b/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/DataInputStreamWithTimeout.java index 469ede68c7892..98c0d5620d881 100644 --- a/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/DataInputStreamWithTimeout.java +++ b/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/DataInputStreamWithTimeout.java @@ -15,16 +15,15 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; -import java.net.SocketTimeoutException; -import java.util.Arrays; import java.util.NoSuchElementException; import java.util.Queue; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; @@ -45,141 +44,34 @@ @NonNullByDefault class DataInputStreamWithTimeout implements Closeable { - private static final int QUEUE_SIZE = 512; - private static final int BUFFER_SIZE = 512; private static final int SLEEP_INTERVAL_MSECS = 50; - - // special character that marks the first and last byte of a slip message - private static final byte SLIP_MARK = (byte) 0xc0; - private static final byte SLIP_PROT = 0; + private static final long MAX_WAIT_SECONDS = 15; private final Logger logger = LoggerFactory.getLogger(DataInputStreamWithTimeout.class); private final Queue slipMessageQueue = new ConcurrentLinkedQueue<>(); + private final InputStream inputStream; + private final VeluxBridgeHandler bridge; - private InputStream inputStream; - - private @Nullable String pollException = null; - private @Nullable Poller pollRunner = null; - private ExecutorService executor; - - private class Poller implements Callable { - - private boolean interrupted = false; - private Future pollerFinished; - - public Poller(ExecutorService executor) { - logger.trace("Poller: created"); - pollerFinished = executor.submit(this); - } - - public void interrupt() { - interrupted = true; - try { - pollerFinished.get(); - } catch (InterruptedException | ExecutionException e) { - } - } - - /** - * Task that loops to read bytes from {@link InputStream} and build SLIP packets from them. The SLIP packets are - * placed in a {@link ConcurrentLinkedQueue}. It loops continuously until 'interrupt()' or 'Thread.interrupt()' - * are called when terminates early after the next socket read timeout. - */ - @Override - public Boolean call() throws Exception { - logger.trace("Poller.call(): started"); - byte[] buf = new byte[BUFFER_SIZE]; - int byt; - int i = 0; - - // clean start, no exception, empty queue - pollException = null; - slipMessageQueue.clear(); - - // loop forever; on shutdown interrupt() gets called to break out of the loop - while (true) { - try { - if (interrupted) { - // fully flush the input buffer - inputStream.readAllBytes(); - break; - } - byt = inputStream.read(); - if (byt < 0) { - // end of stream is OK => keep on polling - continue; - } - buf[i] = (byte) byt; - if ((i > 0) && (buf[i] == SLIP_MARK)) { - // the minimal slip message is 7 bytes [MM PP LL CC CC KK MM] - if ((i > 5) && (buf[0] == SLIP_MARK) && (buf[1] == SLIP_PROT)) { - slipMessageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1)); - if (slipMessageQueue.size() > QUEUE_SIZE) { - logger.warn("Poller.call(): slip message queue overflow => PLEASE REPORT !!"); - slipMessageQueue.poll(); - } - i = 0; - } else { - logger.warn("Poller.call(): non slip messsage discarded => PLEASE REPORT !!"); - buf[0] = SLIP_MARK; - i = 1; - } - continue; - } - if (++i >= BUFFER_SIZE) { - logger.warn("Poller.call(): input buffer overrun => PLEASE REPORT !!"); - i = 0; - } - } catch (SocketTimeoutException e) { - // socket read time outs are OK => keep on polling; unless interrupted - if (interrupted) { - break; - } - continue; - } catch (IOException e) { - // any other exception => stop polling - String msg = e.getMessage(); - pollException = msg != null ? msg : "Generic IOException"; - logger.debug("Poller.call(): stopping '{}'", pollException); - break; - } - } - - logger.trace("Poller.call(): ended"); - // we only get here if shutdown or an error occurs so free ourself so we can be recreated again - pollRunner = null; - return true; - } - } - - /** - * Check if there was an exception on the polling loop task and if so, throw it back on the caller thread. - * - * @throws IOException - */ - private void throwIfPollException() throws IOException { - if (pollException != null) { - logger.debug("passPollException() polling loop exception {}", pollException); - throw new IOException(pollException); - } - } + private @Nullable Poller poller; + private @Nullable Future future; + private @Nullable ExecutorService executor; /** * Creates a {@link DataInputStreamWithTimeout} as a wrapper around the specified underlying {@link InputStream} * - * @param stream the specified input stream + * @param inputStream the specified input stream * @param bridge the actual Bridge Thing instance */ - public DataInputStreamWithTimeout(InputStream stream, VeluxBridgeHandler bridge) { - inputStream = stream; - executor = Executors.newSingleThreadExecutor(bridge.getThreadFactory()); + public DataInputStreamWithTimeout(InputStream inputStream, VeluxBridgeHandler bridge) { + this.inputStream = inputStream; + this.bridge = bridge; } /** - * Overridden method of {@link Closeable} interface. Stops the polling thread. + * Overridden method of {@link Closeable} interface. Stops the polling task. * - * @throws IOException + * @throws IOException (although actually no exceptions are thrown) */ @Override public void close() throws IOException { @@ -192,7 +84,8 @@ public void close() throws IOException { * * @param timeoutMSecs the timeout period in milliseconds. * @return the next SLIP message if there is one on the queue, or any empty byte[] array if not. - * @throws IOException + * @throws IOException if the poller task has unexpectedly terminated e.g. via an IOException, or if either the + * poller task, or the calling thread have been interrupted */ public synchronized byte[] readSlipMessage(int timeoutMSecs) throws IOException { startPolling(); @@ -203,16 +96,22 @@ public synchronized byte[] readSlipMessage(int timeoutMSecs) throws IOException logger.trace("readSlipMessage() => return slip message"); return slip; } catch (NoSuchElementException e) { - // queue empty, wait and continue + // queue empty, fall through and continue } - throwIfPollException(); try { - Thread.sleep(SLEEP_INTERVAL_MSECS); - } catch (InterruptedException e) { - logger.debug("readSlipMessage() => thread interrupt"); + Future future = this.future; + if ((future != null) && future.isDone()) { + future.get(); // throws ExecutionException, InterruptedException + // future terminated without exception, but prematurely, which is itself an exception + throw new IOException("Poller thread terminated prematurely"); + } + Thread.sleep(SLEEP_INTERVAL_MSECS); // throws InterruptedException + } catch (ExecutionException | InterruptedException e) { + // re-cast other exceptions as IOException + throw new IOException(e); } } - logger.debug("readSlipMessage() => no slip message after {}mS => time out", timeoutMSecs); + logger.debug("readSlipMessage() => no slip message"); return new byte[0]; } @@ -239,9 +138,12 @@ public void flush() { * Start the polling task */ private void startPolling() { - if (pollRunner == null) { - logger.trace("startPolling()"); - pollRunner = new Poller(executor); + if (future == null) { + logger.debug("startPolling() called"); + slipMessageQueue.clear(); + poller = new Poller(inputStream, slipMessageQueue); + executor = Executors.newSingleThreadExecutor(bridge.getThreadFactory()); + future = executor.submit(poller); } } @@ -249,11 +151,31 @@ private void startPolling() { * Stop the polling task */ private void stopPolling() { - Poller pollRunner = this.pollRunner; - if (pollRunner != null) { - logger.trace("stopPolling()"); - pollRunner.interrupt(); + logger.debug("stopPolling() called"); + + Poller poller = this.poller; + Future future = this.future; + ExecutorService executor = this.executor; + + this.poller = null; + this.future = null; + this.executor = null; + + if (executor != null) { + executor.shutdown(); + } + if (poller != null) { + poller.interrupt(); + } + if (future != null) { + try { + future.get(MAX_WAIT_SECONDS, TimeUnit.SECONDS); + } catch (ExecutionException e) { + // expected exception due to e.g. IOException on socket close + } catch (TimeoutException | InterruptedException e) { + // unexpected exception due to e.g. KLF200 'zombie state' + logger.warn("stopPolling() exception '{}' => PLEASE REPORT !!", e.getMessage()); + } } - executor.shutdown(); } } diff --git a/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/Poller.java b/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/Poller.java new file mode 100644 index 0000000000000..6c347e1472424 --- /dev/null +++ b/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/Poller.java @@ -0,0 +1,121 @@ +/** + * Copyright (c) 2010-2022 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.binding.velux.internal.bridge.slip.io; + +import java.io.IOException; +import java.io.InputStream; +import java.net.SocketTimeoutException; +import java.util.Arrays; +import java.util.Queue; +import java.util.concurrent.Callable; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements a Callable to read SLIP messages from the input stream. + * + * It implements a secondary polling thread to asynchronously read bytes from the socket input stream into a buffer. + * And it parses the bytes into SLIP messages, which are placed on a message queue. + * + * @author Andrew Fiddian-Green - Initial Contribution; refactored from private class in DataInputStreamWithTimeout + */ +@NonNullByDefault +public class Poller implements Callable { + + private static final int BUFFER_SIZE = 512; + private static final int QUEUE_SIZE = 512; + + // special character that marks the first and last byte of a slip message + private static final byte SLIP_MARK = (byte) 0xc0; + private static final byte SLIP_PROT = 0; + + private final Logger logger = LoggerFactory.getLogger(Poller.class); + + private final InputStream inputStream; + private final Queue messageQueue; + + private @Nullable volatile Thread thread; + + public Poller(InputStream stream, Queue queue) { + logger.trace("Poller: created"); + inputStream = stream; + messageQueue = queue; + } + + public void interrupt() { + Thread thread = this.thread; + if ((thread != null) && thread.isAlive()) { + thread.interrupt(); + } + } + + /** + * Task that loops to read bytes from inputStream and build SLIP packets from them. The SLIP packets are placed in + * messageQueue. It runs until 'interrupt()' or 'Thread.interrupt()' are called. + * + * @throws IOException in case of socket read errors + */ + @Override + public Boolean call() throws IOException { + thread = Thread.currentThread(); + logger.trace("Poller.call(): started"); + byte[] buf = new byte[BUFFER_SIZE]; + int byt; + int i = 0; + + while (!Thread.currentThread().isInterrupted()) { + try { + byt = inputStream.read(); // throws IOException + // end of stream is OK => continue polling + if (byt < 0) { + continue; + } + } catch (SocketTimeoutException e) { + // socket read time out is OK => continue polling + continue; + } + buf[i] = (byte) byt; + if ((i > 0) && (buf[i] == SLIP_MARK)) { + // the minimal slip message is 7 bytes [MM PP LL CC CC KK MM] + if ((i > 5) && (buf[0] == SLIP_MARK) && (buf[1] == SLIP_PROT)) { + messageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1)); + if (messageQueue.size() > QUEUE_SIZE) { + logger.warn("Poller.call(): slip message queue overflow => PLEASE REPORT !!"); + messageQueue.poll(); + } + i = 0; + } else { + if (logger.isWarnEnabled()) { + StringBuilder sb = new StringBuilder(); + for (int j = 0; j <= i; j++) { + sb.append(String.format("%02X ", buf[j])); + } + logger.warn("Poller.call(): non slip messsage {} discarded => PLEASE REPORT !!", sb.toString()); + } + buf[0] = SLIP_MARK; + i = 1; + } + continue; + } + if (++i >= BUFFER_SIZE) { + logger.warn("Poller.call(): input buffer overrun => PLEASE REPORT !!"); + i = 0; + } + } + logger.trace("Poller.call(): completed"); + return true; + } +} diff --git a/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/handler/VeluxBridgeHandler.java b/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/handler/VeluxBridgeHandler.java index 87909569b8bd8..c8f39b38217b4 100644 --- a/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/handler/VeluxBridgeHandler.java +++ b/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/handler/VeluxBridgeHandler.java @@ -135,6 +135,7 @@ public class VeluxBridgeHandler extends ExtendedBaseBridgeHandler implements Vel private VeluxBridge myJsonBridge = new JsonVeluxBridge(this); private VeluxBridge mySlipBridge = new SlipVeluxBridge(this); + private boolean disposing = false; /* * ************************************** @@ -279,6 +280,7 @@ public void initialize() { veluxBridgeConfiguration = new VeluxBinding(getConfigAs(VeluxBridgeConfiguration.class)).checked(); scheduler.execute(() -> { + disposing = false; initializeSchedulerJob(); }); } @@ -314,6 +316,7 @@ private void initializeSchedulerJob() { @Override public void dispose() { scheduler.submit(() -> { + disposing = true; disposeSchedulerJob(); }); } @@ -882,4 +885,13 @@ public NamedThreadFactory getThreadFactory() { } return threadFactory; } + + /** + * Indicates if the bridge thing is being disposed. + * + * @return true if the bridge thing is being disposed. + */ + public boolean isDisposing() { + return disposing; + } }