Skip to content

Writing Peripheral Drivers

Sorin Guga edited this page May 1, 2020 · 8 revisions

What Is A Peripheral

The qToggle API allows a device to expose ports. These ports are usually ways to communicate with peripherals physically or logically attached to the device.

While a GPIO pin may be represented by a single boolean port, a Bluetooth thermostat or a 1-wire temperature & humidity sensor may require multiple ports to represent their state. These multiple ports must be managed together since they share the way they communicate with the peripheral.

To make it easier to manage multiple ports corresponding to the same peripheral, qToggleServer offers the peripherals package.

A Complete Peripheral Driver

Let's assume we have a temperature and humidity sensor attached to the device via a simple UART serial port. For the sake of simplicty, let's also assume that it speaks a very simple text protocol: it expects the ASCII text GET\n and in response, it sends comma-separated temperature and humidity values, such as 22.5,73\n. Let's name our peripheral SerialTH.

Peripheral Class

Our driver will live in a module called serialth.py and we're going to use the pyserial Python library to communicate with our peripheral. Here's how our peripheral class would look like:

serialth.py:
import serial
import time

from qtoggleserver.peripherals import Peripheral


class SerialTH(Peripheral):
    BAUD_RATE = 9600
    TIMEOUT = 0.1       # In seconds
    QUERY_INTERVAL = 5  # In seconds

    def __init__(self, name, serial_port):
        # Open serial port
        self._serial = serial.Serial(serial_port, self.BAUD_RATE, timeout=self.TIMEOUT)
        self._serial_port = serial_port

        # Remember last queried values
        self._last_query_time = 0
        self._last_values = (None, None)

        super().__init__(name)

    def query(self):
        # Send the request
        self._serial.write(b'GET\n')
        
        # Wait for response up to timeout seconds
        data = self._serial.readline()

        parts = data.split(b',')
        if len(parts) != 2:
            return None, None
        
        return float(parts[0]), int(parts[1])
    
    def get_values(self):
        now = time.time()
        if now - self._last_query_time > self.QUERY_INTERVAL:
            self._last_values = self.query()
            self._last_query_time = now
        
        return self._last_values

Each peripheral has a name. Names are normally given by the user of your driver (e.g. kitchen_thermometer) and will be used later, when building associated port ids. We also need to pass the serial_port as a parameter, to allow the user to specify their serial port, instead of hardcoding it.

We define a query method that offers a very simple implementation of the above text protocol. It returns a tuple with (temperature, humidity) pair. The get_values method ensures values are queried every 5 seconds.

Temperature Port Class

The peripheral class above does nothing by itself. We have to define port classes (basically particular types of port drivers) that will actually help us expose our peripheral ports. Let's define a temperature port:

serialth.py:
from qtoggleserver.peripherals import Peripheral, PeripheralPort
...

class Temperature(PeripheralPort):
    TYPE = 'number'
    WRITABLE = False
    ID = 'temperature'

    async def read_value(self):
        values = self.get_peripheral().get_values()
        return [0]

Peripheral ports must inherit from PeripheralPort. Peripheral ports have a get_peripheral() that will return their associated peripheral instance. But how do we associate the port to the peripheral? We have to define the make_port_args method inside our peripheral class. It will return a list of port classes to be associated to the peripheral:

serialth.py:
...
class SerialTH(Peripheral):
    ...
    def make_port_args(self):
        return [
            Temperature
        ]
    ...

Please note that a Port can be associated to one peripheral only!

Now let's actually use our new driver. Add the peripheral driver in your qtoggleserver.conf file (assuming serialth.py is somewhere in qToggleServer's Python path):

qtoggleserver.conf:
...
peripherals = [
    {
        driver = "serialth.SerialTH"
        name = "my_sensor"
        serial_port = "/dev/ttyUSB0"
    }
]
...

This will give you a read-only port with the id my_sensor.temperature.

Humidity Port Class

Let's now write a class for the humidity:

serialth.py:
class Humidity(PeripheralPort):
    TYPE = 'number'
    WRITABLE = False
    ID = 'humidity'

    async def read_value(self):
        values = self.get_peripheral().get_values()
        return values[1]

Don't forget to add it to peripheral's make_port_args:

serialth.py:
...
class SerialTH(Peripheral):
    ...
    def make_port_args(self):
        return [
            Temperature,
            Humidity
        ]
    ...

This will give you a second port called my_sensor.humidity. They will share the same peripheral instance.

Base Peripheral Port Class

Even though the code so far isn't big, we don't like duplicated code and we prefer defining a common base class for our ports:

class SerialTHPort(PeripheralPort):
    TYPE = 'number'
    WRITABLE = False
    
    VALUE_INDEX = None

    async def read_value(self):
        values = self.get_peripheral().get_values()
        return values[self.VALUE_INDEX]


class Temperature(SerialTHPort):
    ID = 'temperature'
    VALUE_INDEX = 0


class Humidity(SerialTHPort):
    ID = 'humidity'
    VALUE_INDEX = 1

Extra Port Parameters

Suppose now that we want to specify a configurable offset to be added to the port values. We'll define corresponding arguments to the peripheral class constructor, given that users can only configure peripherals, not peripheral ports:

serialth.py:
...
class SerialTH(Peripheral):
    ...
    def __init__(self, name, serial_port, temperature_offset=0, humidity_offset=0):
        self._temperature_offset = temperature_offset
        self._humidity_offset = humidity_offset
        ...

    def make_port_args(self):
        return [
            {'driver': Temperature, 'offset': self._temperature_offset},
            {'driver': Humidity, 'offset': self._humidity_offset}
        ]
    ...

Note that, in make_port_args, we now return a dictionaries instead of a simple port classes. We do this to be able to supply arguments to the port class. We'll have to define the offset argument to the port class constructor as well:

serialth.py:
...
class SerialTHPort(PeripheralPort):
    ...
    def __init__(self, offset, peripheral):
        self._offset = offset
        super.__init__(peripheral)
    
    async def read_value(self):
        values = self.get_peripheral().get_values()
        value = values[self.VALUE_INDEX]
        if value is None:
            return

        return value + self._offset
...

We could use *args, **kwargs expressions to gather and pass all arguments to super constructor; however, it is important to realize that the peripheral instance is given as argument to the PeripheralPort class.

Now make the corresponding changes in the configuration. We'll only supply an offset for the temperature:

qtoggleserver.conf:
...
peripherals = [
    ...
    {
        driver = "serialth.SerialTH"
        name = "my_sensor"
        temperature_offset = 2.5
        serial_port = "/dev/ttyUSB0"
    }
    ...
]
...

Custom Port IDs

By setting the peripheral's name to "my_sensor:", our ids are my_sensor.temperature and my_sensor.humidity. But what if we want to use the serial_path to build our ids instead of the peripheral name? We can override the make_id method:

serialth.py:
import re
...
class SerialTH(Peripheral):
    ...
    def get_serial_port(self):
        self._serial_port
    ...

class SerialTHPort(PeripheralPort):
    ...
    def make_id(self):
        prefix = re.sub(r'[^a-z0-9]', '', self.get_peripheral().get_serial_port())
        return f'{prefix}.{self.ID}'
    ...

We add a peripheral get_serial_port method to obtain the serial port. The resulted id will look something like devttyusb0.temperature.

Enabling And Disabling

Peripherals have an internal enabled state, indicating if they are currently in use by ports or not. The enabled state of a peripheral is automatically managed by its ports: it's enabled if at least one of its ports is enabled.

To define the behvior of your peripheral when enabled or disabled, you can simply use the is_enabled method. In our example, we don't want to do any I/O with the serial port unless enabled:

serialth.py:
class SerialTH(Peripheral):
    ...
    def get_values(self):
        if not self.is_enabled():
            return None, None

        now = time.time()
        if now - self._last_query_time > self.QUERY_INTERVAL:
            self._last_values = self.query()
            self._last_query_time = now
        
        return self._last_values
   ...

This however doesn't help too much: the serial port will still be kept open while our peripheral is disabled. A better solution is to open the serial port on demand, using handle_enable and handle_disable methods:

serialth.py:
class SerialTH(Peripheral):
    ...
    def __init__(self, name, serial_port):
        # Serial port will be opened on demand
        self._serial = None
        self._serial_port = serial_port

        # Remember last queried values
        self._last_query_time = 0
        self._last_values = (None, None)

        super().__init__(name)
    ...
    
    async def handle_enable(self):
        # Open serial port
        self._serial = serial.Serial(self._serial_port, self.BAUD_RATE, timeout=self.TIMEOUT)
    
    async def handle_disable(self):
        # Close serial port
        self._serial.close()
        self._serial = None

Online And Offline

We would now like to let the user know when the peripheral is disconnected. We have the online standard attribute of a port that is designed to provide this information.

As soon as we detect that our peripheral is online, we simply call its set_online(True); similarly, as soon as we detect it going offline, we call set_online(False). In our example, being offline means that our peripheral does not respond to requests:

serialth.py:
class SerialTH(Peripheral):
    ...
    def query(self):
        # Send the request
        self._serial.write(b'GET\n')
        
        # Wait for response up to timeout seconds
        data = self._serial.readline()

        parts = data.split(b',')
        if len(parts) != 2:
            self.set_online(False)
            return None, None
        
        self.set_online(True)
        return float(parts[0]), int(parts[1])
    ...

The set_online method can be called multiple times with the same state, without having an effect, unless the state changed.

Further customization of the peripheral class behavior when online state changes can be done by overridding the handle_online and handle_offline methods. By default, they simply call trigger_port_update, which will trigger port-update events on all (enabled) associated ports, effectively updating their online attribute.

Cleaning Up

Our peripheral may start a task that requires a proper shut down when the server is stopped. Or it may open a resource that needs to be freed on a clean server shut down.

For the sake of our example, let's suppose we want to cleanly close the serial port when shutting down, even though that automatically happens. Cleaning up at the end of the peripheral lifecycle can be done by overridding the handle_cleanup method:

serialth.py:
class SerialTH(Peripheral):
    ...
    async def handle_cleanup(self):
        await super().handle_cleanup()
        if self._serial:
            self._serial.close()
            self._serial = None

Normally the _serial attribute should be None at this point, as the handle_disable method will automatically be called when server is stopped.

Logging

What if we want some debugging information about how things go with our peripheral? The Peripheral class inherits from LoggableMixin, which means that we can call methods like error and debug on our peripheral objects:

serialth.py:
class SerialTH(Peripheral):
    ...
    def query(self):
        self.debug('requesting values')
    
        # Send the request
        self._serial.write(b'GET\n')
        
        # Wait for response up to timeout seconds
        data = self._serial.readline()

        parts = data.split(b',')
        if len(parts) != 2:
            self.set_online(False)
            self.error('got unexpected response: %s', data.decode())
            return None, None
        
        self.set_online(True)
        self.debug('got values: %s', data.decode())
        return float(parts[0]), int(parts[1])
    ...

By default, our peripheral's logger will have the base peripheral Python module logger as parent. To use the serialth module logger as parent, set the logger class attribute:

serialth.py:
logger = logging.getLogger(__name__)
...

class SerialTH(Peripheral):
    logger = logger
    ...

Exceptions

Let's make use of dedicated exceptions to signal errors that appear when communicating with our peripheral:

serialth.py:
class SerialTHException(Exception):
    pass


class Timeout(SerialTHException):
    pass


class UnexpectedResponse(SerialTHException):
    pass


class SerialTH(Peripheral):
    ...
    def query(self):
        self.debug('requesting values')

        # Send the request
        self._serial.write(b'GET\n')

        # Wait for response up to timeout seconds
        data = self._serial.readline()
        if not data:
            self.set_online(False)
            raise Timeout(f'Peripheral did not respond after {self.TIMEOUT} seconds')

        parts = data.split(b',')
        if len(parts) != 2:
            self.set_online(False)
            raise UnexpectedResponse(f'Unexpected data received: {data.decode()}')

        self.set_online(True)
        self.debug('got values: %s', data.decode())
        return float(parts[0]), int(parts[1])
    ...

Blocking Code

So far we've got a peripheral that is queried each 5 seconds, at most, and is expected to respond in the following 100 ms. Not great, not terrible. Why? Well, the code is actually usable as is, but what if we have to deal with 10 sensors at the same time? Out of 5 seconds, our server would be blocked for 1 second; having our server blocked for 20% of its running time is far from great.

But we can do better. We have (at least) the following options:

  1. Always read available serial data, without blocking, and use a buffer to gather bytes, until we have our response.
  2. Use pyserial-asyncio instead of pyserial.
  3. Use threading to do blocking I/O.

While the simplest approach in this particular case would be either (1) or (2), we're going to use number (3). Why? Because it is more general (we might encounter cases where code simply cannot be non-blocking).

The Peripheral class comes with a handy mechanism for running blocking code in a separate, dedicated thread. Everything is taken care of, including starting, feeding, collecting results and stopping the thread. All you have to do is call run_threaded with your blocking function and its arguments.

serialth.py:
class SerialTH(Peripheral):
    ...
    async def get_values(self):
        now = time.time()
        if now - self._last_query_time > self.QUERY_INTERVAL:
            self._last_values = await self.run_threaded(self.query)
            self._last_query_time = now
        
        return self._last_values
    ...

class SerialTHPort(PeripheralPort):
    ...
    async def read_value(self):
        values = await self.get_peripheral().get_values()
        value = values[self.VALUE_INDEX]
        if value is None:
            return

        return value + self._offset
    ...

Note that our get_values method is now async.

Full Example

All ideas expressed above are put together into a full example, below. It also features type annotations, which were intentionally omitted in the code snippets above:

serialth.py:
import logging
import re
import serial
import time

from typing import Any, cast, List, Dict, Optional, Tuple

from qtoggleserver.peripherals import Peripheral, PeripheralPort


logger = logging.getLogger(__name__)


class SerialTHException(Exception):
    pass


class Timeout(SerialTHException):
    pass


class UnexpectedResponse(SerialTHException):
    pass


class SerialTH(Peripheral):
    BAUD_RATE = 9600
    TIMEOUT = 0.1       # In seconds
    QUERY_INTERVAL = 5  # In seconds

    logger = logger

    def __init__(self, name: str, serial_port: str, temperature_offset: float = 0, humidity_offset: float = 0) -> None:
        self._temperature_offset: float = temperature_offset
        self._humidity_offset: float = humidity_offset

        # Serial port will be opened on demand
        self._serial: Optional[serial.Serial] = None
        self._serial_port: str = serial_port

        # Remember last queried values
        self._last_query_time: float = 0
        self._last_values: Tuple[Optional[float], Optional[int]] = (None, None)

        super().__init__(name)

    def get_serial_port(self) -> str:
        return self._serial_port

    def make_port_args(self) -> List[Dict[str, Any]]:
        return [
            {'driver': Temperature, 'offset': self._temperature_offset},
            {'driver': Humidity, 'offset': self._humidity_offset}
        ]

    def query(self) -> Tuple[float, int]:
        self.debug('requesting values')

        # Send the request
        self._serial.write(b'GET\n')

        # Wait for response up to timeout seconds
        data = self._serial.readline()
        if not data:
            self.set_online(False)
            raise Timeout(f'Peripheral did not respond after {self.TIMEOUT} seconds')

        parts = data.split(b',')
        if len(parts) != 2:
            self.set_online(False)
            raise UnexpectedResponse(f'Unexpected data received: {data.decode()}')

        self.set_online(True)
        self.debug('got values: %s', data.decode())
        return float(parts[0]), int(parts[1])

    async def get_values(self) -> Tuple[Optional[float], Optional[int]]:
        now = time.time()
        if now - self._last_query_time > self.QUERY_INTERVAL:
            self._last_values = await self.run_threaded(self.query)
            self._last_query_time = now

        return self._last_values

    async def handle_enable(self) -> None:
        # Open serial port
        self._serial = serial.Serial(self._serial_port, self.BAUD_RATE, timeout=self.TIMEOUT)

    async def handle_disable(self) -> None:
        # Close serial port
        self._serial.close()
        self._serial = None

    async def handle_cleanup(self) -> None:
        await super().handle_cleanup()
        if self._serial:
            self._serial.close()
            self._serial = None


class SerialTHPort(PeripheralPort):
    TYPE = 'number'
    WRITABLE = False

    VALUE_INDEX = None

    def __init__(self, offset: float, peripheral: SerialTH) -> None:
        self._offset: float = offset
        super().__init__(peripheral)

    async def read_value(self) -> Optional[float]:
        serial_th = cast(SerialTH, self.get_peripheral())
        values = await serial_th.get_values()
        value = values[self.VALUE_INDEX]
        if value is None:
            return

        return value + self._offset

    def make_id(self) -> str:
        serial_th = cast(SerialTH, self.get_peripheral())
        prefix = re.sub(r'[^a-z0-9]', '', serial_th.get_serial_port())
        return f'{prefix}.{self.ID}'


class Temperature(SerialTHPort):
    ID = 'temperature'
    VALUE_INDEX = 0


class Humidity(SerialTHPort):
    ID = 'humidity'
    VALUE_INDEX = 1

Peripheral Libraries

qToggleServer provides libraries for the most common peripheral connections. Moreover, there are patterns that often apply to many peripherals, patterns for which qToggleServer provides a common, reusable implementation.

Polled Peripherals

Many peripherals need to be polled at regular intervals in order to obtain values. qToggleServer offers a polled module that helps writing polled peripherals. Here's a code snippet examplifying the idea:

mypolledperipheral.py:
from qtoggleserver.lib import polled


class MyPolledPeripheralError(Exception):
    pass

class MyPolledPeripheral(polled.PolledPeripheral):
    TIMEOUT = 5
    ...
    def __init__(self, name):
        self._data = None
        super().__init__(name)
        
    def read(self):
        return self._data
    
    async def poll(self):
        # Potentially async code that reads data and stores it in self._data
        ...
    

Reading from peripherals is often a blocking routine and thus we might not be able to use async code. We can use the run_threaded method to run blocking code inside a thread and handle it asynchronously in poll:

mypolledperipheral.py:
...
class MyPolledPeripheral(polled.PolledPeripheral):
    ...
    def read_sync(self):
        # Code that blocks while reading
        ...
        # Returns read value

    async def poll(self):
        try:
            future = self.run_threaded(self.read_sync)
            self._data = await asyncio.wait_for(future, timeout=self.TIMEOUT)

        except asyncio.TimeoutError as e:
            raise MyPolledPeripheralError('Timeout waiting for data from peripheral') from e
    ...

All ports that will be advertised by your make_port_args method must inherit from PolledPort:

mypolledperipheral.py:
...
class MyPolledPort(polled.PolledPort):
    # Set these to None to disable read interval attribute
    READ_INTERVAL_MIN = 0
    READ_INTERVAL_MAX = 86400
    READ_INTERVAL_STEP = 1
    READ_INTERVAL_MULTIPLIER = 1    # Set to 60 for minutes and 3600 for hours
    READ_INTERVAL_UNIT = None       # Will be automatically set to 'seconds', 'minutes' or 'hours'
...

Polled ports get an additional attribute called read_interval that controls how often to read data from peripheral. When dealing with multiple ports on a peripheral, the polling rate will be adjusted in such a way that the read interval of each port is satisfied.

OneWire

If you want to add a 1-wire peripheral support to qToggleServer, you can use the onewire module. It is a particularized polled peripheral that uses the Linux 1-wire bus (/sys/bus/w1/devices) to access connected devices.

A concrete implementation example is the qtoggleserver-dallastemp add-on.

Bluetooth Low Energy

The ble module simplifies the development of drivers for Bluetooth Low Energy GATT-enabled peripherals.

See qtoggleserver-eq3bt add-on for an implementation example.

Further Reading

The best way to familiarize with peripheral drivers code is to get through the source code of the various available add-ons.

You may wish to prepare a package of your peripheral driver, offering it as an add-on.