Skip to content

Commit

Permalink
[ComfoAir] Optimize serial connection (openhab#5049)
Browse files Browse the repository at this point in the history
* Optimize serial connection

- Add RXTX CPU workaround
- Change Thread to SchedulerExecutorService
- Remove flush() on send, can cause blocking forever

* Removed commented out code
  • Loading branch information
csowada authored and watou committed Jan 31, 2017
1 parent c333b39 commit b5ba1bc
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Enumeration;
import java.util.TooManyListenersException;

import org.apache.commons.io.IOUtils;
import org.openhab.binding.comfoair.internal.InitializationException;
Expand All @@ -25,6 +26,8 @@
import gnu.io.NoSuchPortException;
import gnu.io.PortInUseException;
import gnu.io.SerialPort;
import gnu.io.SerialPortEvent;
import gnu.io.SerialPortEventListener;
import gnu.io.UnsupportedCommOperationException;

/**
Expand All @@ -50,7 +53,7 @@ public class ComfoAirConnector {

/**
* Open and initialize a serial port.
*
*
* @param portName
* e.g. /dev/ttyS0
* @param listener
Expand All @@ -72,6 +75,14 @@ public void open(String portName) throws InitializationException {
serialPort.setSerialPortParams(9600, SerialPort.DATABITS_8, SerialPort.STOPBITS_1,
SerialPort.PARITY_NONE);

serialPort.enableReceiveThreshold(1);
serialPort.enableReceiveTimeout(1000);

// RXTX serial port library causes high CPU load
// Start event listener, which will just sleep and slow down event loop
serialPort.addEventListener(new CPUWorkaroundThread());
serialPort.notifyOnDataAvailable(true);

inputStream = new DataInputStream(new BufferedInputStream(serialPort.getInputStream()));
outputStream = serialPort.getOutputStream();

Expand All @@ -84,10 +95,13 @@ public void open(String portName) throws InitializationException {
throw new InitializationException(e);
} catch (IOException e) {
throw new InitializationException(e);
} catch (TooManyListenersException e) {
throw new InitializationException(e);
}

} catch (NoSuchPortException e) {
StringBuilder sb = new StringBuilder();
@SuppressWarnings("rawtypes")
Enumeration portList = CommPortIdentifier.getPortIdentifiers();
while (portList.hasMoreElements()) {
CommPortIdentifier id = (CommPortIdentifier) portList.nextElement();
Expand Down Expand Up @@ -118,7 +132,7 @@ public void close() {

/**
* Prepare a command for sending using the serial port.
*
*
* @param command
* @return reply byte values
*/
Expand Down Expand Up @@ -264,7 +278,7 @@ public synchronized int[] sendCommand(ComfoAirCommand command) {
/**
* Generate the byte sequence for sending to ComfoAir (incl. START & END
* sequence and checksum).
*
*
* @param command
* @param data
* @return response byte value block with cmd, data and checksum
Expand Down Expand Up @@ -305,7 +319,7 @@ private byte[] calculateRequest(int command, int[] data) {

/**
* Calculates a checksum for a command block (cmd, data and checksum).
*
*
* @param block
* @return checksum byte value
*/
Expand All @@ -327,7 +341,7 @@ private byte calculateChecksum(byte[] block) {

/**
* Cleanup a commandblock from quoted 0x07 characters.
*
*
* @param processBuffer
* @return the 0x07 cleaned byte values
*/
Expand All @@ -353,7 +367,7 @@ private byte[] cleanupBlock(byte[] processBuffer) {

/**
* Escape special 0x07 character.
*
*
* @param cleanedBuffer
* @return escaped byte value array
*/
Expand Down Expand Up @@ -381,7 +395,7 @@ private byte[] escapeBlock(byte[] cleanedBuffer) {

/**
* Send the byte values.
*
*
* @param request
* @return successful flag
*/
Expand All @@ -390,7 +404,6 @@ private boolean send(byte[] request) {

try {
outputStream.write(request);
outputStream.flush();

return true;

Expand All @@ -402,7 +415,7 @@ private boolean send(byte[] request) {

/**
* Is used to debug byte values.
*
*
* @param data
* @return
*/
Expand All @@ -423,4 +436,15 @@ private String dumpData(byte[] data) {
}
return sb.toString();
}

private class CPUWorkaroundThread implements SerialPortEventListener {
@Override
public void serialEvent(SerialPortEvent event) {
try {
logger.trace("RXTX library CPU load workaround, sleep forever");
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;
import org.openhab.binding.comfoair.ComfoAirBindingProvider;
Expand All @@ -35,7 +38,7 @@
* @author Holger Hees
* @since 1.3.0
*/
public class ComfoAirBinding extends AbstractActiveBinding<ComfoAirBindingProvider>implements ManagedService {
public class ComfoAirBinding extends AbstractActiveBinding<ComfoAirBindingProvider> implements ManagedService {

static final Logger logger = LoggerFactory.getLogger(ComfoAirBinding.class);

Expand All @@ -48,11 +51,14 @@ public class ComfoAirBinding extends AbstractActiveBinding<ComfoAirBindingProvid

private ComfoAirConnector connector;

private ScheduledExecutorService scheduler;

/**
* @{inheritDoc
*/
@Override
public void activate() {
scheduler = Executors.newScheduledThreadPool(10);
}

/**
Expand All @@ -64,6 +70,15 @@ public void deactivate() {
provider.removeBindingChangeListener(this);
}

if (scheduler != null) {
scheduler.shutdown();
try {
scheduler.awaitTermination(5000, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.warn("Unable to shutdown scheduler!");
}
}

providers.clear();

if (connector != null) {
Expand Down Expand Up @@ -109,8 +124,8 @@ public void internalReceiveCommand(String itemName, Command command) {

if (affectedReadCommands.size() > 0) {
// refresh 3 seconds later all affected items
Thread updateThread = new AffectedItemsUpdateThread(affectedReadCommands);
updateThread.start();
Runnable updateThread = new AffectedItemsUpdateThread(affectedReadCommands);
scheduler.schedule(updateThread, 3, TimeUnit.SECONDS);
}
}
}
Expand All @@ -133,7 +148,7 @@ public void execute() {
/**
* send a command and send additional command which are affected by the
* first command
*
*
* @param command
*/
private void sendCommand(ComfoAirCommand command) {
Expand Down Expand Up @@ -215,13 +230,8 @@ public AffectedItemsUpdateThread(Collection<ComfoAirCommand> affectedReadCommand

@Override
public void run() {
try {
sleep(3000);
for (ComfoAirCommand readCommand : this.affectedReadCommands) {
sendCommand(readCommand);
}
} catch (InterruptedException e) {
// nothing to do ...
for (ComfoAirCommand readCommand : this.affectedReadCommands) {
sendCommand(readCommand);
}
}
}
Expand Down

0 comments on commit b5ba1bc

Please sign in to comment.