Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[velux] Improve shutdown exception handling #12356

Merged
merged 5 commits into from
Mar 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,18 @@ public synchronized byte[] io(VeluxBridgeHandler bridgeInstance, byte[] request)
} catch (IOException ioe) {
logger.info("io() on {}: Exception occurred during I/O: {}.", host, ioe.getMessage());
lastIOE = ioe;
// Error Retries with Exponential Backoff
long waitTime = ((long) Math.pow(2, retryCount)
* bridgeInstance.veluxBridgeConfiguration().timeoutMsecs);
logger.trace("io() on {}: wait time {} msecs.", host, waitTime);
try {
Thread.sleep(waitTime);
} catch (InterruptedException ie) {
logger.trace("io() on {}: wait interrupted.", host);
if (bridgeInstance.isDisposing()) {
break;
} else {
// Error Retries with Exponential Backoff
long waitTime = ((long) Math.pow(2, retryCount)
* bridgeInstance.veluxBridgeConfiguration().timeoutMsecs);
logger.trace("io() on {}: wait time {} msecs.", host, waitTime);
try {
Thread.sleep(waitTime);
} catch (InterruptedException ie) {
logger.trace("io() on {}: wait interrupted.", host);
}
}
}
} while (retryCount++ < bridgeInstance.veluxBridgeConfiguration().retries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
Expand All @@ -45,141 +44,34 @@
@NonNullByDefault
class DataInputStreamWithTimeout implements Closeable {

private static final int QUEUE_SIZE = 512;
private static final int BUFFER_SIZE = 512;
private static final int SLEEP_INTERVAL_MSECS = 50;

// special character that marks the first and last byte of a slip message
private static final byte SLIP_MARK = (byte) 0xc0;
private static final byte SLIP_PROT = 0;
private static final long MAX_WAIT_SECONDS = 15;

private final Logger logger = LoggerFactory.getLogger(DataInputStreamWithTimeout.class);

private final Queue<byte[]> slipMessageQueue = new ConcurrentLinkedQueue<>();
private final InputStream inputStream;
private final VeluxBridgeHandler bridge;

private InputStream inputStream;

private @Nullable String pollException = null;
private @Nullable Poller pollRunner = null;
private ExecutorService executor;

private class Poller implements Callable<Boolean> {

private boolean interrupted = false;
private Future<Boolean> pollerFinished;

public Poller(ExecutorService executor) {
logger.trace("Poller: created");
pollerFinished = executor.submit(this);
}

public void interrupt() {
interrupted = true;
try {
pollerFinished.get();
} catch (InterruptedException | ExecutionException e) {
}
}

/**
* Task that loops to read bytes from {@link InputStream} and build SLIP packets from them. The SLIP packets are
* placed in a {@link ConcurrentLinkedQueue}. It loops continuously until 'interrupt()' or 'Thread.interrupt()'
* are called when terminates early after the next socket read timeout.
*/
@Override
public Boolean call() throws Exception {
logger.trace("Poller.call(): started");
byte[] buf = new byte[BUFFER_SIZE];
int byt;
int i = 0;

// clean start, no exception, empty queue
pollException = null;
slipMessageQueue.clear();

// loop forever; on shutdown interrupt() gets called to break out of the loop
while (true) {
try {
if (interrupted) {
// fully flush the input buffer
inputStream.readAllBytes();
break;
}
byt = inputStream.read();
if (byt < 0) {
// end of stream is OK => keep on polling
continue;
}
buf[i] = (byte) byt;
if ((i > 0) && (buf[i] == SLIP_MARK)) {
// the minimal slip message is 7 bytes [MM PP LL CC CC KK MM]
if ((i > 5) && (buf[0] == SLIP_MARK) && (buf[1] == SLIP_PROT)) {
slipMessageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1));
if (slipMessageQueue.size() > QUEUE_SIZE) {
logger.warn("Poller.call(): slip message queue overflow => PLEASE REPORT !!");
slipMessageQueue.poll();
}
i = 0;
} else {
logger.warn("Poller.call(): non slip messsage discarded => PLEASE REPORT !!");
buf[0] = SLIP_MARK;
i = 1;
}
continue;
}
if (++i >= BUFFER_SIZE) {
logger.warn("Poller.call(): input buffer overrun => PLEASE REPORT !!");
i = 0;
}
} catch (SocketTimeoutException e) {
// socket read time outs are OK => keep on polling; unless interrupted
if (interrupted) {
break;
}
continue;
} catch (IOException e) {
// any other exception => stop polling
String msg = e.getMessage();
pollException = msg != null ? msg : "Generic IOException";
logger.debug("Poller.call(): stopping '{}'", pollException);
break;
}
}

logger.trace("Poller.call(): ended");
// we only get here if shutdown or an error occurs so free ourself so we can be recreated again
pollRunner = null;
return true;
}
}

/**
* Check if there was an exception on the polling loop task and if so, throw it back on the caller thread.
*
* @throws IOException
*/
private void throwIfPollException() throws IOException {
if (pollException != null) {
logger.debug("passPollException() polling loop exception {}", pollException);
throw new IOException(pollException);
}
}
private @Nullable Poller poller;
private @Nullable Future<Boolean> future;
private @Nullable ExecutorService executor;

/**
* Creates a {@link DataInputStreamWithTimeout} as a wrapper around the specified underlying {@link InputStream}
*
* @param stream the specified input stream
* @param inputStream the specified input stream
* @param bridge the actual Bridge Thing instance
*/
public DataInputStreamWithTimeout(InputStream stream, VeluxBridgeHandler bridge) {
inputStream = stream;
executor = Executors.newSingleThreadExecutor(bridge.getThreadFactory());
public DataInputStreamWithTimeout(InputStream inputStream, VeluxBridgeHandler bridge) {
this.inputStream = inputStream;
this.bridge = bridge;
}

/**
* Overridden method of {@link Closeable} interface. Stops the polling thread.
* Overridden method of {@link Closeable} interface. Stops the polling task.
*
* @throws IOException
* @throws IOException (although actually no exceptions are thrown)
*/
@Override
public void close() throws IOException {
Expand All @@ -192,7 +84,8 @@ public void close() throws IOException {
*
* @param timeoutMSecs the timeout period in milliseconds.
* @return the next SLIP message if there is one on the queue, or any empty byte[] array if not.
* @throws IOException
* @throws IOException if the poller task has unexpectedly terminated e.g. via an IOException, or if either the
* poller task, or the calling thread have been interrupted
*/
public synchronized byte[] readSlipMessage(int timeoutMSecs) throws IOException {
startPolling();
Expand All @@ -203,16 +96,22 @@ public synchronized byte[] readSlipMessage(int timeoutMSecs) throws IOException
logger.trace("readSlipMessage() => return slip message");
return slip;
} catch (NoSuchElementException e) {
// queue empty, wait and continue
// queue empty, fall through and continue
}
throwIfPollException();
try {
Thread.sleep(SLEEP_INTERVAL_MSECS);
} catch (InterruptedException e) {
logger.debug("readSlipMessage() => thread interrupt");
Future<Boolean> future = this.future;
if ((future != null) && future.isDone()) {
future.get(); // throws ExecutionException, InterruptedException
// future terminated without exception, but prematurely, which is itself an exception
throw new IOException("Poller thread terminated prematurely");
}
Thread.sleep(SLEEP_INTERVAL_MSECS); // throws InterruptedException
} catch (ExecutionException | InterruptedException e) {
// re-cast other exceptions as IOException
throw new IOException(e);
}
}
logger.debug("readSlipMessage() => no slip message after {}mS => time out", timeoutMSecs);
logger.debug("readSlipMessage() => no slip message");
return new byte[0];
}

Expand All @@ -239,21 +138,44 @@ public void flush() {
* Start the polling task
*/
private void startPolling() {
if (pollRunner == null) {
logger.trace("startPolling()");
pollRunner = new Poller(executor);
if (future == null) {
logger.debug("startPolling() called");
slipMessageQueue.clear();
poller = new Poller(inputStream, slipMessageQueue);
executor = Executors.newSingleThreadExecutor(bridge.getThreadFactory());
future = executor.submit(poller);
}
}

/**
* Stop the polling task
*/
private void stopPolling() {
Poller pollRunner = this.pollRunner;
if (pollRunner != null) {
logger.trace("stopPolling()");
pollRunner.interrupt();
logger.debug("stopPolling() called");

Poller poller = this.poller;
Future<Boolean> future = this.future;
ExecutorService executor = this.executor;

this.poller = null;
this.future = null;
this.executor = null;

if (executor != null) {
executor.shutdown();
}
if (poller != null) {
poller.interrupt();
}
if (future != null) {
try {
future.get(MAX_WAIT_SECONDS, TimeUnit.SECONDS);
} catch (ExecutionException e) {
// expected exception due to e.g. IOException on socket close
} catch (TimeoutException | InterruptedException e) {
// unexpected exception due to e.g. KLF200 'zombie state'
logger.warn("stopPolling() exception '{}' => PLEASE REPORT !!", e.getMessage());
}
}
executor.shutdown();
}
}
Loading