From 038d4210f0b6a784eac201716b28f8057b0fb677 Mon Sep 17 00:00:00 2001 From: Felix Fanghaenel <35657654+flxdot@users.noreply.github.com> Date: Fri, 19 Apr 2024 21:11:17 +0200 Subject: [PATCH] feat: Device peripherals (#32) Co-authored-by: github-actions[bot] Co-authored-by: flxdot --- .../.idea/py_carlos_database.iml | 13 + .../carlos/edge/device/__init__.py | 5 +- .../carlos/edge/device/config.py | 56 +- .../carlos/edge/device/config_test.py | 28 +- .../carlos/edge/device/driver/__init__.py | 0 .../carlos/edge/device/driver/_dhtxx.py | 159 ++++++ .../edge/device/driver/device_metrics.py | 37 ++ .../carlos/edge/device/driver/dht11.py | 16 + .../carlos/edge/device/driver/dht22.py | 16 + .../carlos/edge/device/driver/relay.py | 28 + .../carlos/edge/device/driver/si1145.py | 500 ++++++++++++++++++ .../carlos/edge/device/protocol/__init__.py | 4 + .../carlos/edge/device/protocol/_gpio_mock.py | 58 ++ .../carlos/edge/device/protocol/gpio.py | 17 + .../carlos/edge/device/protocol/i2c.py | 158 ++++++ .../carlos/edge/device/runtime.py | 55 +- lib/py_edge_device/poetry.lock | 74 ++- lib/py_edge_device/pyproject.toml | 10 +- lib/py_edge_device/tests/__init__.py | 0 .../tests/test_data/__init__.py | 6 + .../tests/test_data/device_config | 30 ++ .../carlos/edge/interface/device/__init__.py | 12 + .../carlos/edge/interface/device/driver.py | 247 +++++++++ .../edge/interface/device/driver_config.py | 131 +++++ .../interface/device/driver_config_test.py | 77 +++ .../edge/interface/device/driver_test.py | 250 +++++++++ lib/py_edge_interface/pyproject.toml | 4 +- services/device/device/cli/config.py | 54 ++ services/device/device/run.py | 5 +- services/device/poetry.lock | 62 +++ 30 files changed, 2064 insertions(+), 48 deletions(-) create mode 100644 lib/py_carlos_database/.idea/py_carlos_database.iml create mode 100644 lib/py_edge_device/carlos/edge/device/driver/__init__.py create mode 100644 lib/py_edge_device/carlos/edge/device/driver/_dhtxx.py create mode 100644 lib/py_edge_device/carlos/edge/device/driver/device_metrics.py create mode 100644 lib/py_edge_device/carlos/edge/device/driver/dht11.py create mode 100644 lib/py_edge_device/carlos/edge/device/driver/dht22.py create mode 100644 lib/py_edge_device/carlos/edge/device/driver/relay.py create mode 100644 lib/py_edge_device/carlos/edge/device/driver/si1145.py create mode 100644 lib/py_edge_device/carlos/edge/device/protocol/__init__.py create mode 100644 lib/py_edge_device/carlos/edge/device/protocol/_gpio_mock.py create mode 100644 lib/py_edge_device/carlos/edge/device/protocol/gpio.py create mode 100644 lib/py_edge_device/carlos/edge/device/protocol/i2c.py create mode 100644 lib/py_edge_device/tests/__init__.py create mode 100644 lib/py_edge_device/tests/test_data/__init__.py create mode 100644 lib/py_edge_device/tests/test_data/device_config create mode 100644 lib/py_edge_interface/carlos/edge/interface/device/__init__.py create mode 100644 lib/py_edge_interface/carlos/edge/interface/device/driver.py create mode 100644 lib/py_edge_interface/carlos/edge/interface/device/driver_config.py create mode 100644 lib/py_edge_interface/carlos/edge/interface/device/driver_config_test.py create mode 100644 lib/py_edge_interface/carlos/edge/interface/device/driver_test.py diff --git a/lib/py_carlos_database/.idea/py_carlos_database.iml b/lib/py_carlos_database/.idea/py_carlos_database.iml new file mode 100644 index 00000000..25e55379 --- /dev/null +++ b/lib/py_carlos_database/.idea/py_carlos_database.iml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/lib/py_edge_device/carlos/edge/device/__init__.py b/lib/py_edge_device/carlos/edge/device/__init__.py index a2f3875c..2d2b455c 100644 --- a/lib/py_edge_device/carlos/edge/device/__init__.py +++ b/lib/py_edge_device/carlos/edge/device/__init__.py @@ -1,4 +1,5 @@ -__all__ = ["DeviceRuntime", "read_config", "DeviceConfig"] +__all__ = [ + "DeviceRuntime", +] -from .config import DeviceConfig, read_config from .runtime import DeviceRuntime diff --git a/lib/py_edge_device/carlos/edge/device/config.py b/lib/py_edge_device/carlos/edge/device/config.py index 945be2c9..bfbd8229 100644 --- a/lib/py_edge_device/carlos/edge/device/config.py +++ b/lib/py_edge_device/carlos/edge/device/config.py @@ -2,10 +2,8 @@ configuration of the application.""" __all__ = [ - "DeviceConfig", - "read_config", + "load_drivers", "read_config_file", - "write_config", "write_config_file", ] @@ -13,17 +11,12 @@ from typing import TypeVar import yaml -from carlos.edge.interface import DeviceId -from pydantic import BaseModel, Field +from carlos.edge.interface.device import CarlosDriver, DriverConfig, DriverFactory +from loguru import logger +from pydantic import BaseModel from carlos.edge.device.constants import CONFIG_FILE_NAME - - -class DeviceConfig(BaseModel): - """Configures the pure device settings.""" - - device_id: DeviceId = Field(..., description="The unique identifier of the device.") - +from carlos.edge.device.driver.device_metrics import DeviceMetrics Config = TypeVar("Config", bound=BaseModel) @@ -50,15 +43,38 @@ def write_config_file(path: Path, config: Config): ) -def read_config() -> DeviceConfig: # pragma: no cover +def load_drivers(config_dir: Path | None = None) -> list[CarlosDriver]: """Reads the configuration from the default location.""" + config_dir = config_dir or Path.cwd() - return read_config_file( - path=Path.cwd() / CONFIG_FILE_NAME, - schema=DeviceConfig, - ) + with open(config_dir / CONFIG_FILE_NAME, "r") as file: + raw_config = yaml.safe_load(file) + + factory = DriverFactory() + try: + driver_configs = [factory.build(config) for config in raw_config["drivers"]] + except KeyError: # pragma: no cover + raise KeyError( + f"The configuration file {config_dir / CONFIG_FILE_NAME} must contain " + f"a 'drivers' key." + ) + + # We always want to have some device metrics + if not any(isinstance(driver, DeviceMetrics) for driver in driver_configs): + driver_configs.insert( + 0, + factory.build( + DriverConfig( + identifier="__device_metrics__", + driver_module=DeviceMetrics.__module__, + ).model_dump() + ), + ) + + logger.info( + f"Loaded {len(driver_configs)} IOs: " + f"{', '.join(str(io) for io in driver_configs)}" + ) -def write_config(config: DeviceConfig): # pragma: no cover - """Writes the configuration to the default location.""" - write_config_file(path=Path.cwd() / CONFIG_FILE_NAME, config=config) + return driver_configs diff --git a/lib/py_edge_device/carlos/edge/device/config_test.py b/lib/py_edge_device/carlos/edge/device/config_test.py index 69883af8..2883bad7 100644 --- a/lib/py_edge_device/carlos/edge/device/config_test.py +++ b/lib/py_edge_device/carlos/edge/device/config_test.py @@ -1,9 +1,10 @@ from pathlib import Path -from uuid import uuid4 import pytest +from carlos.edge.interface.device import AnalogInput, DigitalOutput, GpioDriverConfig -from carlos.edge.device.config import DeviceConfig, read_config_file, write_config_file +from carlos.edge.device.config import load_drivers, read_config_file, write_config_file +from tests.test_data import EXPECTED_IO_COUNT, TEST_DEVICE_WORKDIR def test_config_file_io(tmp_path: Path): @@ -12,10 +13,27 @@ def test_config_file_io(tmp_path: Path): cfg_path = tmp_path / "config" with pytest.raises(FileNotFoundError): - read_config_file(cfg_path, DeviceConfig) + read_config_file(cfg_path, GpioDriverConfig) - config = DeviceConfig(device_id=uuid4()) + config = GpioDriverConfig( + identifier="test-config-file-io", + driver_module="carlos.edge.device.driver.dht11", + direction="input", + pin=7, + ) write_config_file(cfg_path, config) - assert read_config_file(cfg_path, DeviceConfig) == config + assert read_config_file(cfg_path, GpioDriverConfig) == config + + +def test_load_io(): + """This test ensures that the IOs are loaded correctly.""" + + io = load_drivers(config_dir=TEST_DEVICE_WORKDIR) + + assert len(io) == EXPECTED_IO_COUNT, "The number of IOs does not match." + + assert all( + isinstance(io, (AnalogInput, DigitalOutput)) for io in io + ), "Not all IOs are of the correct type." diff --git a/lib/py_edge_device/carlos/edge/device/driver/__init__.py b/lib/py_edge_device/carlos/edge/device/driver/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lib/py_edge_device/carlos/edge/device/driver/_dhtxx.py b/lib/py_edge_device/carlos/edge/device/driver/_dhtxx.py new file mode 100644 index 00000000..38fce119 --- /dev/null +++ b/lib/py_edge_device/carlos/edge/device/driver/_dhtxx.py @@ -0,0 +1,159 @@ +__all__ = ["DHTXX", "DhtConfig", "DHTType"] + +from abc import ABC +from enum import StrEnum +from time import sleep +from typing import Literal + +from carlos.edge.interface.device import AnalogInput, GpioDriverConfig +from pydantic import Field + +from carlos.edge.device.protocol import GPIO + + +class DhtConfig(GpioDriverConfig): + """Configuration for a DHT sensor.""" + + direction: Literal["input"] = Field("input") + + +class DHTType(StrEnum): + DHT11 = "DHT11" + DHT22 = "DHT22" + + +class DHT: + """Code for Temperature & Humidity Sensor of Seeed Studio. + + Code is originally from, but modified to my needs: + http://wiki.seeedstudio.com/Grove-TemperatureAndHumidity_Sensor/ + """ + + PULSES_CNT = 41 + + MAX_CNT = 320 + + def __init__(self, dht_type: DHTType, pin: int): + """ + + :param dht_type: either DHTtype.DHT11 or DHTtype.22 + :param pin: gpio pin where the sensor is connected to + """ + + # store the pin and type + self._pin = pin + self._dht_type = dht_type + + GPIO.setup(self._pin, GPIO.OUT) + + def read(self) -> tuple[float, float]: + """Internal read method. + + http://www.ocfreaks.com/basics-interfacing-dht11-dht22-humidity-temperature-sensor-mcu/ + + :returns (humidity in %, temperature in °C)""" + + # Send Falling signal to trigger sensor output data + # Wait for 20ms to collect 42 bytes data + GPIO.setup(self._pin, GPIO.OUT) + GPIO.output(self._pin, GPIO.HIGH) + sleep(0.2) + GPIO.output(self._pin, GPIO.LOW) + sleep(0.018) + + GPIO.setup(self._pin, GPIO.IN) + + # a short delay needed + for _ in range(10): + pass + + # pullup by host 20-40 us + count = 0 + while GPIO.input(self._pin): + count += 1 + if count > self.MAX_CNT: + raise RuntimeError("pullup by host 20-40us failed") + + pulse_cnt = [0] * (2 * self.PULSES_CNT) + for pulse in range(0, self.PULSES_CNT * 2, 2): + while not GPIO.input(self._pin): + pulse_cnt[pulse] += 1 + if pulse_cnt[pulse] > self.MAX_CNT: + raise RuntimeError(f"pulldown by DHT timeout: {pulse}") + + while GPIO.input(self._pin): + pulse_cnt[pulse + 1] += 1 + if pulse_cnt[pulse + 1] > self.MAX_CNT: + if pulse == (self.PULSES_CNT - 1) * 2: + pass + raise RuntimeError(f"pullup by DHT timeout: {pulse}") + + total_cnt = 0 + for pulse in range(2, 2 * self.PULSES_CNT, 2): + total_cnt += pulse_cnt[pulse] + + # Low level (50 us) average counter + average_cnt = total_cnt / (self.PULSES_CNT - 1) + + data = "" + for pulse in range(3, 2 * self.PULSES_CNT, 2): + if pulse_cnt[pulse] > average_cnt: + data += "1" + else: + data += "0" + + byte0 = int(data[0:8], 2) + byte1 = int(data[8:16], 2) + byte2 = int(data[16:24], 2) + byte3 = int(data[24:32], 2) + crc_byte = int(data[32:40], 2) + + data_checksum = (byte0 + byte1 + byte2 + byte3) & 0xFF + if crc_byte != data_checksum: + raise RuntimeError("checksum error!") + + if self._dht_type == DHTType.DHT11: + humidity = float(byte0) + temperature = float(byte2) + else: + humidity = float(int(data[0:16], 2) * 0.1) + temperature = float(int(data[17:32], 2) * 0.2 * (0.5 - int(data[16], 2))) + + return temperature, humidity + + +class DHTXX(AnalogInput, ABC): + """DHTXX Temperature and Humidity Sensor.""" + + def __init__(self, config: GpioDriverConfig): + + super().__init__(config=config) + + self._dht: DHT | None = None + self._dht_type: DHTType | None = None + + def setup(self): + """Sets up the DHT11 sensor.""" + + self._dht = DHT(dht_type=self._dht_type, pin=self.config.pin) + + def read(self) -> dict[str, float]: + """Reads the temperature and humidity.""" + + assert self._dht is not None, "The DHT sensor has not been initialized." + + # Reading the DHT sensor is quite unreliable, as the device is not a real-time + # device. Thus, we just try it a couple of times and fail if it does not work. + last_error: Exception | None = None + for i in range(16): + try: + temperature, humidity = self._dht.read() + return { + "temperature": temperature, + "humidity": humidity, + } + except RuntimeError as ex: + last_error = ex + + assert last_error is not None + raise last_error diff --git a/lib/py_edge_device/carlos/edge/device/driver/device_metrics.py b/lib/py_edge_device/carlos/edge/device/driver/device_metrics.py new file mode 100644 index 00000000..5d8b2a06 --- /dev/null +++ b/lib/py_edge_device/carlos/edge/device/driver/device_metrics.py @@ -0,0 +1,37 @@ +import psutil +from carlos.edge.interface.device import AnalogInput, DriverConfig, DriverFactory + + +class DeviceMetrics(AnalogInput): + """Provides the metrics of the device.""" + + def __init__(self, config: DriverConfig): + + super().__init__(config=config) + + def setup(self): + pass + + def read(self) -> dict[str, float]: + """Reads the device metrics.""" + + return { + "cpu.load_percent": psutil.cpu_percent(interval=1.0), + "cpu.temperature": self._read_cpu_temp(), + "memory.usage_percent": psutil.virtual_memory().percent, + "disk.usage_percent": psutil.disk_usage("/").percent, + } + + @staticmethod + def _read_cpu_temp() -> float: + """Reads the CPU temperature.""" + try: + with open("/sys/class/thermal/thermal_zone0/temp") as f: + return float(f.read().strip()) / 1000 + except FileNotFoundError: + return 0.0 + + +DriverFactory().register( + driver_module=__name__, config=DriverConfig, factory=DeviceMetrics +) diff --git a/lib/py_edge_device/carlos/edge/device/driver/dht11.py b/lib/py_edge_device/carlos/edge/device/driver/dht11.py new file mode 100644 index 00000000..d91d10ac --- /dev/null +++ b/lib/py_edge_device/carlos/edge/device/driver/dht11.py @@ -0,0 +1,16 @@ +from carlos.edge.interface.device import DriverFactory, GpioDriverConfig + +from ._dhtxx import DHTXX, DhtConfig, DHTType + + +class DHT11(DHTXX): + """DHT11 Temperature and Humidity Sensor.""" + + def __init__(self, config: GpioDriverConfig): + + super().__init__(config=config) + + self._dht_type = DHTType.DHT11 + + +DriverFactory().register(driver_module=__name__, config=DhtConfig, factory=DHT11) diff --git a/lib/py_edge_device/carlos/edge/device/driver/dht22.py b/lib/py_edge_device/carlos/edge/device/driver/dht22.py new file mode 100644 index 00000000..f90c7a1a --- /dev/null +++ b/lib/py_edge_device/carlos/edge/device/driver/dht22.py @@ -0,0 +1,16 @@ +from carlos.edge.interface.device import DriverFactory, GpioDriverConfig + +from ._dhtxx import DHTXX, DhtConfig, DHTType + + +class DHT22(DHTXX): + """DHT22 Temperature and Humidity Sensor.""" + + def __init__(self, config: GpioDriverConfig): + + super().__init__(config=config) + + self._dht_type = DHTType.DHT22 + + +DriverFactory().register(driver_module=__name__, config=DhtConfig, factory=DHT22) diff --git a/lib/py_edge_device/carlos/edge/device/driver/relay.py b/lib/py_edge_device/carlos/edge/device/driver/relay.py new file mode 100644 index 00000000..a372ebd1 --- /dev/null +++ b/lib/py_edge_device/carlos/edge/device/driver/relay.py @@ -0,0 +1,28 @@ +from typing import Literal + +from carlos.edge.interface.device import DigitalOutput, DriverFactory, GpioDriverConfig +from pydantic import Field + +from carlos.edge.device.protocol import GPIO + + +class RelayConfig(GpioDriverConfig): + + direction: Literal["output"] = Field("output") + + +class Relay(DigitalOutput): + """Relay.""" + + def __init__(self, config: RelayConfig): + super().__init__(config=config) + + def setup(self): + GPIO.setup(self.config.pin, GPIO.OUT, initial=GPIO.LOW) + + def set(self, value: bool): + """Writes the value to the relay.""" + GPIO.output(self.config.pin, value) + + +DriverFactory().register(driver_module=__name__, config=RelayConfig, factory=Relay) diff --git a/lib/py_edge_device/carlos/edge/device/driver/si1145.py b/lib/py_edge_device/carlos/edge/device/driver/si1145.py new file mode 100644 index 00000000..ee4c00e9 --- /dev/null +++ b/lib/py_edge_device/carlos/edge/device/driver/si1145.py @@ -0,0 +1,500 @@ +import time +from typing import Literal + +from carlos.edge.interface.device import AnalogInput, DriverFactory, I2cDriverConfig +from pydantic import Field + +from carlos.edge.device.protocol import I2C + + +class Si1145Config(I2cDriverConfig): + + direction: Literal["input"] = Field("input") + + address: Literal["0x60"] = Field("0x60") + + +class SI1145(AnalogInput): + + def __init__(self, config: Si1145Config): + + if config.address_int != SDL_Pi_SI1145.ADDR: + raise ValueError( + f"The address of the SI1145 sensor must be 0x60. Got {config.address} instead" + ) + + super().__init__(config=config) + + self._si1145: SDL_Pi_SI1145 | None = None + + def setup(self): + + self._si1145 = SDL_Pi_SI1145() + + def read(self) -> dict[str, float]: + """Reads various light levels from the sensor.""" + + assert self._si1145 is not None, "The sensor has not been set up." + + vis_raw = self._si1145.read_visible() + vis_lux = self._si1145.convert_visible_to_lux(vis_raw) + ir_raw = self._si1145.read_ir() + ir_lux = self._si1145.convert_ir_to_lux(ir_raw) + uv_idx = self._si1145.read_uv_index() + + return { + "visual-light-raw": float(vis_raw), + "visual-light": float(vis_lux), + "infrared-light-raw": float(ir_raw), + "infrared-light": float(ir_lux), + "uv-index": float(uv_idx), + } + + +DriverFactory().register(driver_module=__name__, config=Si1145Config, factory=SI1145) + + +class SDL_Pi_SI1145: + """Interface to the SI1145 UV Light Sensor by Adafruit. + + Note: This sensor has a static I2C Address of 0x60. + + https://www.silabs.com/documents/public/data-sheets/Si1145-46-47.pdf + + Modified for medium range vis, IR SDL December 2016 and + non Adafruit I2C (interfers with others) + + Original Author: Joe Gutting + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. + """ + + # COMMANDS + PARAM_QUERY = 0x80 + PARAM_SET = 0xA0 + NOP = 0x0 + RESET = 0x01 + BUSADDR = 0x02 + PS_FORCE = 0x05 + ALS_FORCE = 0x06 + PSALS_FORCE = 0x07 + PS_PAUSE = 0x09 + ALS_PAUSE = 0x0A + PSALS_PAUSE = 0xB + PS_AUTO = 0x0D + ALS_AUTO = 0x0E + PSALS_AUTO = 0x0F + GET_CAL = 0x12 + + # Parameters + PARAM_I2CADDR = 0x00 + PARAM_CHLIST = 0x01 + PARAM_CHLIST_ENUV = 0x80 + PARAM_CHLIST_ENAUX = 0x40 + PARAM_CHLIST_ENALSIR = 0x20 + PARAM_CHLIST_ENALSVIS = 0x10 + PARAM_CHLIST_ENPS1 = 0x01 + PARAM_CHLIST_ENPS2 = 0x02 + PARAM_CHLIST_ENPS3 = 0x04 + + PARAM_PSLED12SEL = 0x02 + PARAM_PSLED12SEL_PS2NONE = 0x00 + PARAM_PSLED12SEL_PS2LED1 = 0x10 + PARAM_PSLED12SEL_PS2LED2 = 0x20 + PARAM_PSLED12SEL_PS2LED3 = 0x40 + PARAM_PSLED12SEL_PS1NONE = 0x00 + PARAM_PSLED12SEL_PS1LED1 = 0x01 + PARAM_PSLED12SEL_PS1LED2 = 0x02 + PARAM_PSLED12SEL_PS1LED3 = 0x04 + + PARAM_PSLED3SEL = 0x03 + PARAM_PSENCODE = 0x05 + PARAM_ALSENCODE = 0x06 + + PARAM_PS1ADCMUX = 0x07 + PARAM_PS2ADCMUX = 0x08 + PARAM_PS3ADCMUX = 0x09 + PARAM_PSADCOUNTER = 0x0A + PARAM_PSADCGAIN = 0x0B + PARAM_PSADCMISC = 0x0C + PARAM_PSADCMISC_RANGE = 0x20 + PARAM_PSADCMISC_PSMODE = 0x04 + + PARAM_ALSIRADCMUX = 0x0E + PARAM_AUXADCMUX = 0x0F + + PARAM_ALS_VIS_ADC_COUNTER = 0x10 + PARAM_ALS_VIS_ADC_GAIN = 0x11 + PARAM_ALS_VIS_ADC_MISC = 0x12 + PARAM_ALS_VIS_ADC_MISC_VISRANGE = 0x10 + # PARAM_ALS_VIS_ADC_MISC_VISRANGE = 0x00 + + PARAM_ALS_IR_ADC_COUNTER = 0x1D + PARAM_ALS_IR_ADC_GAIN = 0x1E + PARAM_ALS_IR_ADC_MISC = 0x1F + PARAM_ALS_IR_ADC_MISC_RANGE = 0x20 + # PARAM_ALS_IR_ADC_MISC_RANGE = 0x00 + + PARAM_ADCCOUNTER_511CLK = 0x70 + + PARAM_ADCMUX_SMALLIR = 0x00 + PARAM_ADCMUX_LARGEIR = 0x03 + + # REGISTERS + REG_PARTID = 0x00 + REG_REVID = 0x01 + REG_SEQID = 0x02 + + REG_INTCFG = 0x03 + REG_INTCFG_INTOE = 0x01 + REG_INTCFG_INTMODE = 0x02 + + REG_IRQEN = 0x04 + REG_IRQEN_ALSEVERYSAMPLE = 0x01 + REG_IRQEN_PS1EVERYSAMPLE = 0x04 + REG_IRQEN_PS2EVERYSAMPLE = 0x08 + REG_IRQEN_PS3EVERYSAMPLE = 0x10 + + REG_IRQMODE1 = 0x05 + REG_IRQMODE2 = 0x06 + + REG_HWKEY = 0x07 + REG_MEASRATE0 = 0x08 + REG_MEASRATE1 = 0x09 + REG_PSRATE = 0x0A + REG_PSLED21 = 0x0F + REG_PSLED3 = 0x10 + REG_UCOEFF0 = 0x13 + REG_UCOEFF1 = 0x14 + REG_UCOEFF2 = 0x15 + REG_UCOEFF3 = 0x16 + REG_PARAMWR = 0x17 + REG_COMMAND = 0x18 + REG_RESPONSE = 0x20 + REG_IRQSTAT = 0x21 + REG_IRQSTAT_ALS = 0x01 + + REG_ALSVISDATA0 = 0x22 + REG_ALSVISDATA1 = 0x23 + REG_ALSIRDATA0 = 0x24 + REG_ALSIRDATA1 = 0x25 + REG_PS1DATA0 = 0x26 + REG_PS1DATA1 = 0x27 + REG_PS2DATA0 = 0x28 + REG_PS2DATA1 = 0x29 + REG_PS3DATA0 = 0x2A + REG_PS3DATA1 = 0x2B + REG_UVINDEX0 = 0x2C + REG_UVINDEX1 = 0x2D + REG_PARAMRD = 0x2E + REG_CHIPSTAT = 0x30 + + # I2C Address + ADDR = 0x60 + + DARK_OFFSET_VIS = 259 + DARK_OFFSE_TIR = 253 + + def __init__(self): + """Constructor.""" + + self._i2c = I2C(address=SDL_Pi_SI1145.ADDR) + + self._reset() + self._load_calibration() + + # device reset + def _reset(self): + """Resets the device + + :return: + """ + + with self._i2c.lock: + self._i2c.write8(register=SDL_Pi_SI1145.REG_MEASRATE0, value=0x00) + self._i2c.write8(register=SDL_Pi_SI1145.REG_MEASRATE1, value=0x00) + self._i2c.write8(register=SDL_Pi_SI1145.REG_IRQEN, value=0x00) + self._i2c.write8(register=SDL_Pi_SI1145.REG_IRQMODE1, value=0x00) + self._i2c.write8(register=SDL_Pi_SI1145.REG_IRQMODE2, value=0x00) + self._i2c.write8(register=SDL_Pi_SI1145.REG_INTCFG, value=0x00) + self._i2c.write8(register=SDL_Pi_SI1145.REG_IRQSTAT, value=0xFF) + + self._i2c.write8( + register=SDL_Pi_SI1145.REG_COMMAND, value=SDL_Pi_SI1145.RESET + ) + time.sleep(0.01) + self._i2c.write8(register=SDL_Pi_SI1145.REG_HWKEY, value=0x17) + time.sleep(0.01) + + def write_param(self, parameter: int, value: int) -> int: + """Write Parameter to the Sensor.""" + + with self._i2c.lock: + self._i2c.write8(register=SDL_Pi_SI1145.REG_PARAMWR, value=value) + self._i2c.write8( + register=SDL_Pi_SI1145.REG_COMMAND, + value=parameter | SDL_Pi_SI1145.PARAM_SET, + ) + param_val = self._i2c.read_uint8(register=SDL_Pi_SI1145.REG_PARAMRD) + return param_val + + def read_param(self, parameter: int) -> int: + """Read Parameter from the Sensor.""" + + with self._i2c.lock: + self._i2c.write8( + register=SDL_Pi_SI1145.REG_COMMAND, + value=parameter | SDL_Pi_SI1145.PARAM_QUERY, + ) + return self._i2c.read_uint8(register=SDL_Pi_SI1145.REG_PARAMRD) + + # load calibration to sensor + def _load_calibration(self): + """Load calibration data.""" + + with self._i2c.lock: + # Enable UVindex measurement coefficients! + self._i2c.write8(register=SDL_Pi_SI1145.REG_UCOEFF0, value=0x29) + self._i2c.write8(register=SDL_Pi_SI1145.REG_UCOEFF1, value=0x89) + self._i2c.write8(register=SDL_Pi_SI1145.REG_UCOEFF2, value=0x02) + self._i2c.write8(register=SDL_Pi_SI1145.REG_UCOEFF3, value=0x00) + + # Enable UV sensor + self.write_param( + parameter=SDL_Pi_SI1145.PARAM_CHLIST, + value=SDL_Pi_SI1145.PARAM_CHLIST_ENUV + | SDL_Pi_SI1145.PARAM_CHLIST_ENALSIR + | SDL_Pi_SI1145.PARAM_CHLIST_ENALSVIS + | SDL_Pi_SI1145.PARAM_CHLIST_ENPS1, + ) + + # Enable interrupt on every sample + self._i2c.write8( + register=SDL_Pi_SI1145.REG_INTCFG, + value=SDL_Pi_SI1145.REG_INTCFG_INTOE, + ) + self._i2c.write8( + register=SDL_Pi_SI1145.REG_IRQEN, + value=SDL_Pi_SI1145.REG_IRQEN_ALSEVERYSAMPLE, + ) + + # /****************************** Prox Sense 1 */ + + # Program LED current + self._i2c.write8( + register=SDL_Pi_SI1145.REG_PSLED21, value=0x03 + ) # 20mA for LED 1 only + self.write_param( + parameter=SDL_Pi_SI1145.PARAM_PS1ADCMUX, + value=SDL_Pi_SI1145.PARAM_ADCMUX_LARGEIR, + ) + + # Prox sensor #1 uses LED #1 + self.write_param( + parameter=SDL_Pi_SI1145.PARAM_PSLED12SEL, + value=SDL_Pi_SI1145.PARAM_PSLED12SEL_PS1LED1, + ) + + # Fastest clocks, clock div 1 + self.write_param(parameter=SDL_Pi_SI1145.PARAM_PSADCGAIN, value=0x00) + + # Take 511 clocks to measure + self.write_param( + parameter=SDL_Pi_SI1145.PARAM_PSADCOUNTER, + value=SDL_Pi_SI1145.PARAM_ADCCOUNTER_511CLK, + ) + + # in prox mode, high range + self.write_param( + parameter=SDL_Pi_SI1145.PARAM_PSADCMISC, + value=SDL_Pi_SI1145.PARAM_PSADCMISC_RANGE + | SDL_Pi_SI1145.PARAM_PSADCMISC_PSMODE, + ) + self.write_param( + parameter=SDL_Pi_SI1145.PARAM_ALSIRADCMUX, + value=SDL_Pi_SI1145.PARAM_ADCMUX_SMALLIR, + ) + + # Fastest clocks, clock div 1 + self.write_param( + parameter=SDL_Pi_SI1145.PARAM_ALS_IR_ADC_GAIN, + value=0, + # value=4 + ) + + # Take 511 clocks to measure + self.write_param( + parameter=SDL_Pi_SI1145.PARAM_ALS_IR_ADC_COUNTER, + value=SDL_Pi_SI1145.PARAM_ADCCOUNTER_511CLK, + ) + + # in high range mode + self.write_param( + parameter=SDL_Pi_SI1145.PARAM_ALS_IR_ADC_MISC, + value=0, + # value=SDL_Pi_SI1145.PARAM_ALS_IR_ADC_MISC_RANGE + ) + + # fastest clocks, clock div 1 + self.write_param( + parameter=SDL_Pi_SI1145.PARAM_ALS_VIS_ADC_GAIN, + value=0, + # value=4 + ) + + # Take 511 clocks to measure + self.write_param( + parameter=SDL_Pi_SI1145.PARAM_ALS_VIS_ADC_COUNTER, + value=SDL_Pi_SI1145.PARAM_ADCCOUNTER_511CLK, + ) + + # in high range mode (not normal signal) + self.write_param( + parameter=SDL_Pi_SI1145.PARAM_ALS_VIS_ADC_MISC, + value=0, + # value=SDL_Pi_SI1145.PARAM_ALS_VIS_ADC_MISC_VISRANGE + ) + + # measurement rate for auto + self._i2c.write8( + register=SDL_Pi_SI1145.REG_MEASRATE0, value=0xFF + ) # 255 * 31.25uS = 8ms + + # auto run + self._i2c.write8( + register=SDL_Pi_SI1145.REG_COMMAND, value=SDL_Pi_SI1145.PSALS_AUTO + ) + + def read_visible(self) -> int: + """returns visible + IR light levels""" + + with self._i2c.lock: + return self._i2c.read_uint16(register=SDL_Pi_SI1145.REG_ALSVISDATA0) + + def read_visible_lux(self) -> float: + """returns visible + IR light levels in lux""" + + return self.convert_visible_to_lux(self.read_visible()) + + def read_ir(self) -> int: + """returns IR light levels""" + + with self._i2c.lock: + return self._i2c.read_uint16(register=SDL_Pi_SI1145.REG_ALSIRDATA0) + + def read_ir_lux(self) -> float: + """returns IR light levels in lux""" + + return self.convert_ir_to_lux(self.read_ir()) + + def read_prox(self) -> int: + """Returns "Proximity" - assumes an IR LED is attached to LED""" + + with self._i2c.lock: + return self._i2c.read_uint16(register=SDL_Pi_SI1145.REG_PS1DATA0) + + def read_uv(self) -> int: + """Returns the UV index * 100 (divide by 100 to get the index)""" + + with self._i2c.lock: + # apply additional calibration of /10 based on sunlight + return self._i2c.read_uint16(register=SDL_Pi_SI1145.REG_UVINDEX0) / 10 + + def read_uv_index(self) -> float: + """Returns the UV Index.""" + + return self.read_uv() / 100 + + def convert_ir_to_lux(self, ir: int) -> float: + """Converts IR levels to lux.""" + + return self._convert_raw_to_lux( + raw=ir, + dark_offset=SDL_Pi_SI1145.DARK_OFFSE_TIR, + calibration_factor=50, # calibration factor to sunlight applied + param_adc_gain=SDL_Pi_SI1145.PARAM_ALS_IR_ADC_GAIN, + param_adc_misc=SDL_Pi_SI1145.PARAM_ALS_IR_ADC_MISC, + ) + + @staticmethod + def convert_uv_to_index(uv: int) -> float: + """Converts the read UV values to UV index.""" + + return uv / 100 + + def convert_visible_to_lux(self, vis: int) -> float: + """Converts the visible light level to lux.""" + + # Param 1: ALS_VIS_ADC_MISC + return self._convert_raw_to_lux( + raw=vis, + dark_offset=SDL_Pi_SI1145.DARK_OFFSET_VIS, + calibration_factor=100, # calibration to bright sunlight added + param_adc_gain=SDL_Pi_SI1145.PARAM_ALS_VIS_ADC_GAIN, + param_adc_misc=SDL_Pi_SI1145.PARAM_ALS_VIS_ADC_MISC, + ) + + def _convert_raw_to_lux( + self, + raw: int, + dark_offset: int, + calibration_factor: int, + param_adc_gain: int, + param_adc_misc: int, + ) -> float: + """Converts a raw input to Lux by applying a dark offset and a calibration + factor.""" + + raw = raw - dark_offset + if raw < 0: + raw = 0 + + lux = 2.44 + + # Get gain + gain = 1 + # These are set to defaults in the Adafruit driver_module - + # need to change if you change them in the SI1145 driver_module + # range_ = self.read_param(parameter=adc_misc) + # if (range_ & 32) == 32: + # gain = 14.5 + + # Get sensitivity + multiplier = 1 + # These are set to defaults in the Adafruit driver_module - + # need to change if you change them in the SI1145 driver_module + # sensitivity = self.read_param(parameter=adc_gain) + # if (sensitivity & 7) == 0: + # multiplier = 1 + # if (sensitivity & 7) == 1: + # multiplier = 2 + # if (sensitivity & 7) == 2: + # multiplier = 4 + # if (sensitivity & 7) == 3: + # multiplier = 8 + # if (sensitivity & 7) == 4: + # multiplier = 16 + # if (sensitivity & 7) == 5: + # multiplier = 32 + # if (sensitivity & 7) == 6: + # multiplier = 64 + # if (sensitivity & 7) == 7: + # multiplier = 128 + + return raw * (gain / (lux * multiplier)) * calibration_factor diff --git a/lib/py_edge_device/carlos/edge/device/protocol/__init__.py b/lib/py_edge_device/carlos/edge/device/protocol/__init__.py new file mode 100644 index 00000000..8f3cbe0a --- /dev/null +++ b/lib/py_edge_device/carlos/edge/device/protocol/__init__.py @@ -0,0 +1,4 @@ +__all__ = ["GPIO", "I2C", "I2cLock"] + +from .gpio import GPIO +from .i2c import I2C, I2cLock diff --git a/lib/py_edge_device/carlos/edge/device/protocol/_gpio_mock.py b/lib/py_edge_device/carlos/edge/device/protocol/_gpio_mock.py new file mode 100644 index 00000000..0ae44470 --- /dev/null +++ b/lib/py_edge_device/carlos/edge/device/protocol/_gpio_mock.py @@ -0,0 +1,58 @@ +__all__ = ["GPIO"] + +from loguru import logger + + +class GpioMock: # pragma: no cover + + LOW = 0 + HIGH = 1 + + IN = 0 + OUT = 1 + + PUD_OFF = 0 + PUD_DOWN = 1 + PUD_UP = 2 + + BOARD = 10 + BCM = 11 + + def __init__(self): + self._pins: list[int] = [] + + def setwarnings(self, state: bool): + pass + + def setmode(self, mode: int): + pass + + def setup(self, pin: int | list[int], mode: int): + if isinstance(pin, list): + for p in pin: + self.setup(p, mode) + return + + if pin not in self._pins: + self._pins.append(pin) + logger.debug(f"Setup pin {pin} in mode {mode}") + + def output(self, pin: int, state: bool): + if pin in self._pins: + logger.debug(f"Set pin {pin} to {'HIGH' if state else 'LOW'}") + else: + raise ValueError(f"Pin {pin} not set up") + + def input(self, pin: int) -> bool: + if pin in self._pins: + logger.debug(f"Reading input from pin {pin}") + return False # Dummy value, always returning False for simplicity + else: + raise ValueError(f"Pin {pin} not set up") + + def cleanup(self): + self._pins.clear() + logger.debug("Cleaned up GPIO pins") + + +GPIO = GpioMock() diff --git a/lib/py_edge_device/carlos/edge/device/protocol/gpio.py b/lib/py_edge_device/carlos/edge/device/protocol/gpio.py new file mode 100644 index 00000000..39c612dd --- /dev/null +++ b/lib/py_edge_device/carlos/edge/device/protocol/gpio.py @@ -0,0 +1,17 @@ +__all__ = ["GPIO"] + +import traceback +import warnings + +try: + from RPi import GPIO # type: ignore +except ImportError: + warnings.warn( + "RPi.GPIO not available. Fallback to mocked GPIO instead. " + f"{traceback.format_exc()}" + ) + from ._gpio_mock import GPIO # type: ignore + +# Choose the GPIO mode globally +GPIO.setmode(GPIO.BCM) +GPIO.setwarnings(False) diff --git a/lib/py_edge_device/carlos/edge/device/protocol/i2c.py b/lib/py_edge_device/carlos/edge/device/protocol/i2c.py new file mode 100644 index 00000000..bfa24dd4 --- /dev/null +++ b/lib/py_edge_device/carlos/edge/device/protocol/i2c.py @@ -0,0 +1,158 @@ +__all__ = ["I2cLock", "I2C"] + +import re +from threading import RLock +from typing import Sequence + +import smbus2 + +I2C_LOCK = RLock() + + +class I2cLock: # pragma: no cover + def __new__(cls): + return I2C_LOCK + + +class I2C: # pragma: no cover + """This class is based on, but heavily modified from, the Adafruit_I2C class""" + + def __init__(self, address: int, bus: int | None = None): + """Creates an instance of the I2C class. + + :param address: The I2C address of the device. + :param bus: The I2C bus number. If None, the bus number is auto-detected. + """ + + self.address = address + # By default, the correct I2C bus is auto-detected using /proc/cpuinfo + # Alternatively, you can hard-code the bus version below: + # self.bus = smbus2.SMBus(0); # Force I2C0 (early 256MB Pi's) + # self.bus = smbus2.SMBus(1); # Force I2C1 (512MB Pi's) + self.bus = smbus2.SMBus( + bus=bus if bus is not None else I2C.get_pi_i2v_bus_number() + ) + + @property + def lock(self) -> RLock: + """Returns the i2c lock""" + return I2C_LOCK + + def write8(self, register: int, value: int): + """Writes an 8-bit value to the specified register/address""" + try: + self.bus.write_byte_data( + i2c_addr=self.address, register=register, value=value + ) + except IOError: + raise IOError( + f"Error accessing 0x{self.address:0x}: Check your I2C address." + ) + + def write16(self, register: int, value: int): + """Writes a 16-bit value to the specified register/address pair""" + try: + self.bus.write_word_data( + i2c_addr=self.address, register=register, value=value + ) + except IOError: + raise IOError( + f"Error accessing 0x{self.address:0x}: Check your I2C address." + ) + + def write_raw8(self, value: int): + """Writes an 8-bit value on the bus""" + try: + self.bus.write_byte(i2c_addr=self.address, value=value) + except IOError: + raise IOError( + f"Error accessing 0x{self.address:0x}: Check your I2C address." + ) + + def write_list(self, register: int, data: Sequence[int]): + """Writes an array of bytes using I2C format""" + try: + self.bus.write_i2c_block_data( + i2c_addr=self.address, register=register, data=data + ) + except IOError: + raise IOError( + f"Error accessing 0x{self.address:0x}: Check your I2C address." + ) + + def read_list(self, register: int, length: int): + """Read a list of bytes from the I2C device""" + try: + return self.bus.read_i2c_block_data( + i2c_addr=self.address, register=register, length=length + ) + except IOError: + raise IOError( + f"Error accessing 0x{self.address:0x}: Check your I2C address." + ) + + def read_uint8(self, register: int) -> int: + """Read an unsigned byte from the I2C device""" + try: + return self.bus.read_byte_data(i2c_addr=self.address, register=register) + except IOError: + raise IOError( + f"Error accessing 0x{self.address:0x}: Check your I2C address." + ) + + def read_int8(self, register: int): + """Reads a signed byte from the I2C device""" + result = self.read_uint8(register=register) + if result > 127: + result -= 256 + return result + + def read_uint16(self, register: int, little_endian: bool = True): + """Reads an unsigned 16-bit value from the I2C device""" + try: + result = self.bus.read_word_data(i2c_addr=self.address, register=register) + # Swap bytes if using big endian because read_word_data assumes little + # endian on ARM (little endian) systems. + if not little_endian: + result = ((result << 8) & 0xFF00) + (result >> 8) + return result + except IOError: + raise IOError( + f"Error accessing 0x{self.address:0x}: Check your I2C address." + ) + + def read_int16(self, register: int, little_endian=True): + """Reads a signed 16-bit value from the I2C device""" + result = self.read_uint16(register=register, little_endian=little_endian) + if result > 32767: + result -= 65536 + return result + + @staticmethod + def get_pi_revision() -> int: + """Gets the version number of the Raspberry Pi board""" + # Revision list available at: + # http://elinux.org/RPi_HardwareHistory#Board_Revision_History + try: + with open("/proc/cpuinfo", "r") as infile: + for line in infile: + # Match a line of the form "Revision : 0002" while ignoring extra + # info in front of the revsion + # (like 1000 when the Pi was over-volted). + match = re.match("Revision\s+:\s+.*(\w{4})$", line) + if match and match.group(1) in ["0000", "0002", "0003"]: + # Return revision 1 if revision ends with 0000, 0002 or 0003. + return 1 + elif match: + # Assume revision 2 if revision ends with any other 4 chars. + return 2 + # Couldn't find the revision, assume revision 0 like older code for + # compatibility. + return 0 + except Exception: + return 0 + + @staticmethod + def get_pi_i2v_bus_number() -> int: + # Gets the I2C bus number /dev/i2c# + return 1 if I2C.get_pi_revision() > 1 else 0 diff --git a/lib/py_edge_device/carlos/edge/device/runtime.py b/lib/py_edge_device/carlos/edge/device/runtime.py index 14949287..39ce1972 100644 --- a/lib/py_edge_device/carlos/edge/device/runtime.py +++ b/lib/py_edge_device/carlos/edge/device/runtime.py @@ -3,41 +3,41 @@ from datetime import timedelta from pathlib import Path +from typing import Self from apscheduler import AsyncScheduler from apscheduler.triggers.interval import IntervalTrigger -from carlos.edge.interface import EdgeConnectionDisconnected, EdgeProtocol +from carlos.edge.interface import DeviceId, EdgeConnectionDisconnected, EdgeProtocol +from carlos.edge.interface.device.driver import validate_device_address_space from carlos.edge.interface.protocol import PING from loguru import logger from .communication import DeviceCommunicationHandler -from .config import DeviceConfig +from .config import load_drivers # We don't cover this in the unit tests. This needs to be tested in an integration test. class DeviceRuntime: # pragma: no cover - def __init__(self, config: DeviceConfig, protocol: EdgeProtocol): + def __init__(self, device_id: DeviceId, protocol: EdgeProtocol): """Initializes the device runtime. - :param config: The configuration of the device. + :param device_id: The unique identifier of the device. + :param protocol: The concrete implementation of the EdgeProtocol. """ - self.config = config + self.device_id = device_id self.protocol = protocol + self.driver_manager = DriverManager() + async def run(self): """Runs the device runtime.""" - logger.add( - sink=Path.cwd() / ".carlos_data" / "device" / "device_log_{time}.log", - level="INFO", - rotation="50 MB", - retention=timedelta(days=60), - ) + self._prepare_runtime() communication_handler = DeviceCommunicationHandler( - protocol=self.protocol, device_id=self.config.device_id + protocol=self.protocol, device_id=self.device_id ) if not self.protocol.is_connected: @@ -49,6 +49,7 @@ async def run(self): kwargs={"communication_handler": communication_handler}, trigger=IntervalTrigger(minutes=1), ) + self.io_manager.register_tasks(scheduler=scheduler) await scheduler.start_in_background() while True: @@ -60,6 +61,36 @@ async def run(self): except EdgeConnectionDisconnected: await self.protocol.connect() + def _prepare_runtime(self): + + logger.add( + sink=Path.cwd() / ".carlos_data" / "device" / "device_log_{time}.log", + level="INFO", + rotation="50 MB", + retention=timedelta(days=60), + ) + + +class DriverManager: # pragma: no cover + + def __init__(self): + + self.drivers = load_drivers() + validate_device_address_space(self.drivers) + + def setup(self) -> Self: + """Sets up the I/O peripherals.""" + for driver in self.drivers: + logger.debug(f"Setting up driver {driver}.") + driver.setup() + + return self + + def register_tasks(self, scheduler: AsyncScheduler) -> Self: + """Registers the tasks of the I/O peripherals.""" + + return self + async def send_ping( communication_handler: DeviceCommunicationHandler, diff --git a/lib/py_edge_device/poetry.lock b/lib/py_edge_device/poetry.lock index ebf98132..569195c7 100644 --- a/lib/py_edge_device/poetry.lock +++ b/lib/py_edge_device/poetry.lock @@ -148,7 +148,7 @@ test = ["coverage", "freezegun", "pre-commit", "pytest", "pytest-cov", "pytest-m [[package]] name = "carlos-edge-interface" -version = "0.1.0" +version = "0.1.1" description = "Shared library to handle the edge communication." optional = false python-versions = ">=3.11,<3.12" @@ -712,6 +712,34 @@ files = [ [package.dependencies] wcwidth = "*" +[[package]] +name = "psutil" +version = "5.9.8" +description = "Cross-platform lib for process and system monitoring in Python." +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" +files = [ + {file = "psutil-5.9.8-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:26bd09967ae00920df88e0352a91cff1a78f8d69b3ecabbfe733610c0af486c8"}, + {file = "psutil-5.9.8-cp27-cp27m-manylinux2010_i686.whl", hash = "sha256:05806de88103b25903dff19bb6692bd2e714ccf9e668d050d144012055cbca73"}, + {file = "psutil-5.9.8-cp27-cp27m-manylinux2010_x86_64.whl", hash = "sha256:611052c4bc70432ec770d5d54f64206aa7203a101ec273a0cd82418c86503bb7"}, + {file = "psutil-5.9.8-cp27-cp27mu-manylinux2010_i686.whl", hash = "sha256:50187900d73c1381ba1454cf40308c2bf6f34268518b3f36a9b663ca87e65e36"}, + {file = "psutil-5.9.8-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:02615ed8c5ea222323408ceba16c60e99c3f91639b07da6373fb7e6539abc56d"}, + {file = "psutil-5.9.8-cp27-none-win32.whl", hash = "sha256:36f435891adb138ed3c9e58c6af3e2e6ca9ac2f365efe1f9cfef2794e6c93b4e"}, + {file = "psutil-5.9.8-cp27-none-win_amd64.whl", hash = "sha256:bd1184ceb3f87651a67b2708d4c3338e9b10c5df903f2e3776b62303b26cb631"}, + {file = "psutil-5.9.8-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:aee678c8720623dc456fa20659af736241f575d79429a0e5e9cf88ae0605cc81"}, + {file = "psutil-5.9.8-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8cb6403ce6d8e047495a701dc7c5bd788add903f8986d523e3e20b98b733e421"}, + {file = "psutil-5.9.8-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d06016f7f8625a1825ba3732081d77c94589dca78b7a3fc072194851e88461a4"}, + {file = "psutil-5.9.8-cp36-cp36m-win32.whl", hash = "sha256:7d79560ad97af658a0f6adfef8b834b53f64746d45b403f225b85c5c2c140eee"}, + {file = "psutil-5.9.8-cp36-cp36m-win_amd64.whl", hash = "sha256:27cc40c3493bb10de1be4b3f07cae4c010ce715290a5be22b98493509c6299e2"}, + {file = "psutil-5.9.8-cp37-abi3-win32.whl", hash = "sha256:bc56c2a1b0d15aa3eaa5a60c9f3f8e3e565303b465dbf57a1b730e7a2b9844e0"}, + {file = "psutil-5.9.8-cp37-abi3-win_amd64.whl", hash = "sha256:8db4c1b57507eef143a15a6884ca10f7c73876cdf5d51e713151c1236a0e68cf"}, + {file = "psutil-5.9.8-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:d16bbddf0693323b8c6123dd804100241da461e41d6e332fb0ba6058f630f8c8"}, + {file = "psutil-5.9.8.tar.gz", hash = "sha256:6be126e3225486dff286a8fb9a06246a5253f4c7c53b475ea5f5ac934e64194c"}, +] + +[package.extras] +test = ["enum34", "ipaddress", "mock", "pywin32", "wmi"] + [[package]] name = "pydantic" version = "2.7.0" @@ -1081,6 +1109,21 @@ typing-extensions = "*" [package.extras] dev = ["flake8", "flake8-docstrings", "mypy", "packaging", "pre-commit", "pytest", "pytest-cov", "types-setuptools"] +[[package]] +name = "rpi-gpio" +version = "0.7.1" +description = "A module to control Raspberry Pi GPIO channels" +optional = false +python-versions = "*" +files = [ + {file = "RPi.GPIO-0.7.1-cp27-cp27mu-linux_armv6l.whl", hash = "sha256:b86b66dc02faa5461b443a1e1f0c1d209d64ab5229696f32fb3b0215e0600c8c"}, + {file = "RPi.GPIO-0.7.1-cp310-cp310-linux_armv6l.whl", hash = "sha256:57b6c044ef5375a78c8dda27cdfadf329e76aa6943cd6cffbbbd345a9adf9ca5"}, + {file = "RPi.GPIO-0.7.1-cp37-cp37m-linux_armv6l.whl", hash = "sha256:77afb817b81331ce3049a4b8f94a85e41b7c404d8e56b61ac0f1eb75c3120868"}, + {file = "RPi.GPIO-0.7.1-cp38-cp38-linux_armv6l.whl", hash = "sha256:29226823da8b5ccb9001d795a944f2e00924eeae583490f0bc7317581172c624"}, + {file = "RPi.GPIO-0.7.1-cp39-cp39-linux_armv6l.whl", hash = "sha256:15311d3b063b71dee738cd26570effc9985a952454d162937c34e08c0fc99902"}, + {file = "RPi.GPIO-0.7.1.tar.gz", hash = "sha256:cd61c4b03c37b62bba4a5acfea9862749c33c618e0295e7e90aa4713fb373b70"}, +] + [[package]] name = "ruff" version = "0.3.7" @@ -1118,6 +1161,22 @@ files = [ {file = "semver-3.0.2.tar.gz", hash = "sha256:6253adb39c70f6e51afed2fa7152bcd414c411286088fb4b9effb133885ab4cc"}, ] +[[package]] +name = "smbus2" +version = "0.4.3" +description = "smbus2 is a drop-in replacement for smbus-cffi/smbus-python in pure Python" +optional = false +python-versions = "*" +files = [ + {file = "smbus2-0.4.3-py2.py3-none-any.whl", hash = "sha256:a2fc29cfda4081ead2ed61ef2c4fc041d71dd40a8d917e85216f44786fca2d1d"}, + {file = "smbus2-0.4.3.tar.gz", hash = "sha256:36f2288a8e1a363cb7a7b2244ec98d880eb5a728a2494ac9c71e9de7bf6a803a"}, +] + +[package.extras] +docs = ["sphinx (>=1.5.3)"] +qa = ["flake8"] +test = ["mock", "nose"] + [[package]] name = "sniffio" version = "1.3.1" @@ -1242,6 +1301,17 @@ files = [ {file = "tomlkit-0.12.4.tar.gz", hash = "sha256:7ca1cfc12232806517a8515047ba66a19369e71edf2439d0f5824f91032b6cc3"}, ] +[[package]] +name = "types-psutil" +version = "5.9.5.20240316" +description = "Typing stubs for psutil" +optional = false +python-versions = ">=3.8" +files = [ + {file = "types-psutil-5.9.5.20240316.tar.gz", hash = "sha256:5636f5714bb930c64bb34c4d47a59dc92f9d610b778b5364a31daa5584944848"}, + {file = "types_psutil-5.9.5.20240316-py3-none-any.whl", hash = "sha256:2fdd64ea6e97befa546938f486732624f9255fde198b55e6f00fda236f059f64"}, +] + [[package]] name = "types-pyyaml" version = "6.0.12.20240311" @@ -1353,4 +1423,4 @@ dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"] [metadata] lock-version = "2.0" python-versions = ">=3.11,<3.12" -content-hash = "1567e5e5cfd6b309a9ad9da8f8c1d9130717d3a8683d4c39b7f380ec62a82f83" +content-hash = "1360c9bb3a5033c9673e55b9ddf9db48210ddf6dda4e73057645c9599f01f322" diff --git a/lib/py_edge_device/pyproject.toml b/lib/py_edge_device/pyproject.toml index f4248b4a..fa10b7ee 100644 --- a/lib/py_edge_device/pyproject.toml +++ b/lib/py_edge_device/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "carlos.edge.device" -version = "0.1.1" +version = "0.1.2" description = "The library for the edge device of the carlos project." authors = ["Felix Fanghanel"] license = "MIT" @@ -16,10 +16,14 @@ apscheduler = "^4.0.0a4" pydantic = "^2.6.4" pyyaml = "^6.0.1" "carlos.edge.interface" = {path = "../py_edge_interface"} +psutil = "^5.9.8" +rpi-gpio = {version = "^0.7.1", markers = "platform_machine == 'armv7l' or platform_machine == 'aarch64'"} +smbus2 = "^0.4.3" [tool.poetry.group.dev.dependencies] "devtools" = {path = "../py_dev_dependencies"} types-pyyaml = "^6.0.12.20240311" +types-psutil = "^5.9.5.20240316" [build-system] @@ -27,7 +31,7 @@ requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" [tool.bumpversion] -current_version = "0.1.1" +current_version = "0.1.2" commit = true tag = false parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)(\\-(?P[a-z0-9\\.]+))?" @@ -76,4 +80,6 @@ exclude_lines = [ omit = [ # omit all tests "*_test.py", + # depends on actual device and hardware + "carlos/edge/device/driver/*.py" ] diff --git a/lib/py_edge_device/tests/__init__.py b/lib/py_edge_device/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lib/py_edge_device/tests/test_data/__init__.py b/lib/py_edge_device/tests/test_data/__init__.py new file mode 100644 index 00000000..999b0f15 --- /dev/null +++ b/lib/py_edge_device/tests/test_data/__init__.py @@ -0,0 +1,6 @@ +from pathlib import Path + +TEST_DEVICE_WORKDIR = Path(__file__).parent + +EXPECTED_IO_COUNT = 11 +"""10 configured I/Os + 1 default I/O.""" diff --git a/lib/py_edge_device/tests/test_data/device_config b/lib/py_edge_device/tests/test_data/device_config new file mode 100644 index 00000000..f0c34133 --- /dev/null +++ b/lib/py_edge_device/tests/test_data/device_config @@ -0,0 +1,30 @@ +drivers: + - identifier: temp-humi-intern + driver_module: dht11 + pin: 4 + - identifier: uv-light + driver_module: si1145 + - identifier: pump + driver_module: relay + pin: 20 + - identifier: valve-1 + driver_module: relay + pin: 21 + - identifier: valve-2 + driver_module: relay + pin: 22 + - identifier: valve-3 + driver_module: relay + pin: 23 + - identifier: valve-4 + driver_module: carlos.edge.device.driver.relay + pin: 24 + - identifier: relay-6 + driver_module: carlos.edge.device.driver.relay + pin: 25 + - identifier: relay-7 + driver_module: carlos.edge.device.driver.relay + pin: 26 + - identifier: relay-8 + driver_module: carlos.edge.device.driver.relay + pin: 27 diff --git a/lib/py_edge_interface/carlos/edge/interface/device/__init__.py b/lib/py_edge_interface/carlos/edge/interface/device/__init__.py new file mode 100644 index 00000000..6ca4894c --- /dev/null +++ b/lib/py_edge_interface/carlos/edge/interface/device/__init__.py @@ -0,0 +1,12 @@ +__all__ = [ + "AnalogInput", + "CarlosDriver", + "DigitalOutput", + "DriverConfig", + "DriverFactory", + "GpioDriverConfig", + "I2cDriverConfig", +] + +from .driver import AnalogInput, CarlosDriver, DigitalOutput, DriverFactory +from .driver_config import DriverConfig, GpioDriverConfig, I2cDriverConfig diff --git a/lib/py_edge_interface/carlos/edge/interface/device/driver.py b/lib/py_edge_interface/carlos/edge/interface/device/driver.py new file mode 100644 index 00000000..6ac759fa --- /dev/null +++ b/lib/py_edge_interface/carlos/edge/interface/device/driver.py @@ -0,0 +1,247 @@ +__all__ = [ + "AnalogInput", + "DigitalOutput", + "CarlosDriver", + "DriverFactory", + "validate_device_address_space", +] +import asyncio +import concurrent.futures +from abc import ABC, abstractmethod +from collections import namedtuple +from time import sleep +from typing import Any, Callable, Generic, Iterable, Self, TypeVar + +from .driver_config import ( + DirectionMixin, + DriverConfig, + GpioDriverConfig, + I2cDriverConfig, +) + +DriverConfigTypeVar = TypeVar("DriverConfigTypeVar", bound=DriverConfig) + + +class CarlosDriverBase(ABC, Generic[DriverConfigTypeVar]): + """Common base class for all drivers.""" + + def __init__(self, config: DriverConfigTypeVar): + self.config: DriverConfigTypeVar = config + + def __str__(self): + return f"{self.config.identifier} ({self.config.driver_module})" + + @property + def identifier(self): + return self.config.identifier + + @abstractmethod + def setup(self) -> Self: + """Sets up the peripheral. This is required for testing. As the test runner + is not able to run the setup method of the peripheral outside the device.""" + pass + + @abstractmethod + def test(self): + """Tests the peripheral. This is used to validate a config by a human.""" + pass + + +class AnalogInput(CarlosDriverBase, ABC): + """Common base class for all analog input peripherals.""" + + def __init__(self, config: DriverConfigTypeVar): + + if isinstance(config, DirectionMixin): + if config.direction != "input": + raise ValueError( + "Recieved a non-input configuration for an analog input." + ) + + super().__init__(config) + + @abstractmethod + def read(self) -> dict[str, float]: + """Reads the value of the analog input. The return value is a dictionary + containing the value of the analog input.""" + pass + + def test(self): + """Tests the analog input by reading the value.""" + + return self.read() + + async def read_async(self) -> dict[str, float]: + """Reads the value of the analog input asynchronously. The return value is a + dictionary containing the value of the analog input.""" + + loop = asyncio.get_running_loop() + + with concurrent.futures.ThreadPoolExecutor() as pool: + return await loop.run_in_executor(executor=pool, func=self.read) + + +class DigitalOutput(CarlosDriverBase, ABC): + """Common base class for all digital output peripherals.""" + + def __init__(self, config: DriverConfigTypeVar): + + if isinstance(config, DirectionMixin): + if config.direction != "output": + raise ValueError( + "Recieved a non-output configuration for a digital output." + ) + + super().__init__(config) + + @abstractmethod + def set(self, value: bool): + pass + + def test(self): + """Tests the digital output by setting the value to False, then True for 1 second, + and then back to False.""" + + self.set(False) + self.set(True) + sleep(1) + self.set(False) + + +CarlosDriver = AnalogInput | DigitalOutput + +DriverDefinition = namedtuple("DriverDefinition", ["config", "factory"]) + + +class DriverFactory: + """A singleton factory for io peripherals.""" + + _instance = None + _driver_index: dict[str, DriverDefinition] = {} + + def __new__(cls): + if cls._instance is None: + cls._instance = super(DriverFactory, cls).__new__(cls) + cls._instance._driver_index = {} + + return cls._instance + + def register( + self, + driver_module: str, + config: type[DriverConfigTypeVar], + factory: Callable[[DriverConfigTypeVar], CarlosDriver], + ): + """Registers a peripheral with the peripheral registry. + + :param driver_module: The peripheral type. + :param config: The peripheral configuration model. + :param factory: The peripheral factory function. + :raises ValueError: If the config is not a subclass of DriverConfig. + :raises RuntimeError: If the peripheral is already registered. + """ + + if not issubclass(config, DriverConfig): + raise ValueError( + "The config must be a subclass of DriverConfig. " + "Please ensure that the config class is a subclass of DriverConfig." + ) + + if driver_module in self._driver_index: + raise RuntimeError(f"The peripheral {driver_module} is already registered.") + + self._driver_index[driver_module] = DriverDefinition(config, factory) + + def build(self, config: dict[str, Any]) -> CarlosDriver: + """Builds a IO object from its configuration. + + :param config: The raw configuration. The schema must adhere to the + DriverConfig model. But we require the full config as the ios may require + additional parameters. + :returns: The IO object. + :raises RuntimeError: If the driver is not registered. + """ + + io_config = DriverConfig.model_validate(config) + + if io_config.driver_module not in self._driver_index: + raise RuntimeError( + f"The driver {io_config.driver_module} is not registered." + f"Make sure to register `DriverFactory().register(...)` " + f"the peripheral before building it." + ) + + driver_definition = self._driver_index[io_config.driver_module] + + return driver_definition.factory( + driver_definition.config.model_validate(config) + ) + + +I2C_PINS = [2, 3] +"""The Pin numbers designated for I2C communication.""" + + +def validate_device_address_space(drivers: Iterable[CarlosDriver]): + """This function ensures that the configured pins and addresses are unique. + + :param drivers: The list of IOs to validate. + :raises ValueError: If any of the pins or addresses are configured more than once. + If the GPIO pins 2 and 3 are when I2C communication is configured. + If any of the identifiers are configured more than once. + """ + + # Ensure all identifiers are unique + seen_identifiers = set() + duplicate_identifiers = [ + driver.identifier + for driver in drivers + if driver.identifier in seen_identifiers or seen_identifiers.add(driver.identifier) # type: ignore[func-returns-value] # noqa: E501 + ] + if duplicate_identifiers: + raise ValueError( + f"The identifiers {duplicate_identifiers} are configured more than " + f"once. Please ensure that each identifier is configured only once." + ) + + configs = [io.config for io in drivers] + + gpio_configs: list[GpioDriverConfig] = [ + io for io in configs if isinstance(io, GpioDriverConfig) + ] + + # Ensure GPIO pins are unique + seen_pins = set() + duplicate_gpio_pins = [ + gpio.pin + for gpio in gpio_configs + if gpio.pin in seen_pins or seen_pins.add(gpio.pin) # type: ignore[func-returns-value] # noqa: E501 + ] + if duplicate_gpio_pins: + raise ValueError( + f"The GPIO pins {duplicate_gpio_pins} are configured more than once." + f"Please ensure that each GPIO pin is configured only once." + ) + + i2c_configs: list[I2cDriverConfig] = [ + io for io in configs if isinstance(io, I2cDriverConfig) + ] + if i2c_configs: + if any(gpio.pin in I2C_PINS for gpio in gpio_configs): + raise ValueError( + "The GPIO pins 2 and 3 are reserved for I2C communication." + "Please use other pins for GPIO configuration." + ) + + # Ensure I2C addresses are unique + seen_addresses = set() + duplicate_i2c_addresses = [ + i2c.address + for i2c in i2c_configs + if i2c.address in seen_addresses or seen_addresses.add(i2c.address) # type: ignore[func-returns-value] # noqa: E501 + ] + if duplicate_i2c_addresses: + raise ValueError( + f"The I2C addresses {duplicate_i2c_addresses} are configured more than " + f"once. Please ensure that each I2C address is configured only once." + ) diff --git a/lib/py_edge_interface/carlos/edge/interface/device/driver_config.py b/lib/py_edge_interface/carlos/edge/interface/device/driver_config.py new file mode 100644 index 00000000..3cf5661e --- /dev/null +++ b/lib/py_edge_interface/carlos/edge/interface/device/driver_config.py @@ -0,0 +1,131 @@ +__all__ = [ + "DirectionMixin", + "DriverConfig", + "GpioDriverConfig", + "I2cDriverConfig", +] + +import importlib +from typing import Literal + +from pydantic import BaseModel, Field, field_validator + +# Pin layout +# 5V, 5V, GND, 14, 15, 18, GND, 23, 24, GND, 25, 8, 7, ID EEPROM, GND, 12, GND, 16, 20, 21 # Outer pins # noqa +# 3.3V, 2, 3, 4, GND, 17, 27, 22, 3.3V, 10, 9, 11, GND, ID EEPROM, 5, 6, 13, 19, 26, GND # Inner pins # noqa + + +class DriverConfig(BaseModel): + """Common base class for all driver_module configurations.""" + + identifier: str = Field( + ..., + description="A unique identifier for the driver_module configuration. " + "It is used to allow changing addresses, pins if required later.", + ) + + driver_module: str = Field( + ..., + description="Refers to the module name that implements the IO driver_module. " + "Built-in drivers located in carlos.edge.device.driver module " + "don't need to specify the full path. Each driver_module module" + "must make a call to the DriverFactory.register method to register" + "itself.", + ) + + @field_validator("driver_module", mode="after") + def _validate_driver_module(cls, value): + """Converts a module name to a full module path.""" + + # check if the given module exists in the current working directory. + try: + importlib.import_module(value) + except ModuleNotFoundError: + abs_module = "carlos.edge.device.driver" + "." + value + try: + importlib.import_module(abs_module) + except ModuleNotFoundError: + raise ValueError(f"The module {value} ({abs_module}) does not exist.") + value = abs_module # pragma: no cover + + return value + + +class DirectionMixin(BaseModel): + direction: Literal["input", "output"] = Field( + ..., description="The direction of the IO." + ) + + +class GpioDriverConfig(DriverConfig, DirectionMixin): + """Defines a single input configuration.""" + + protocol: Literal["gpio"] = Field( + "gpio", + description="The communication protocol to be used for the IO.", + ) + + pin: Literal[ + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + 21, + 22, + 23, + 24, + 25, + 26, + 27, + ] = Field(..., description="The GPIO pin number.") + + +class I2cDriverConfig(DriverConfig, DirectionMixin): + """Defines a single input configuration.""" + + protocol: Literal["i2c"] = Field( + "i2c", + description="The communication protocol to be used for the IO.", + ) + + address: str = Field(..., description="The I2C address of the device.") + + @field_validator("address", mode="before") + def validate_address(cls, value): + """Validate the I2C address.""" + + if isinstance(value, str): + if value.startswith("0x"): + value = value[2:] + + try: + value = int(value, 16) + except ValueError: + raise ValueError("The I2C address must be a valid hexadecimal value.") + + # first 2 are reserved + # address length is 7 bits, but the majority of literature defines 0x77 as max + if not 0x03 <= int(value) <= 0x77: + raise ValueError("The valid I2C address range is 0x03 to 0x77.") + + return f"0x{value:02x}" + + @property + def address_int(self): + """Returns the I2C address as an integer.""" + return int(self.address, 16) diff --git a/lib/py_edge_interface/carlos/edge/interface/device/driver_config_test.py b/lib/py_edge_interface/carlos/edge/interface/device/driver_config_test.py new file mode 100644 index 00000000..64d25aea --- /dev/null +++ b/lib/py_edge_interface/carlos/edge/interface/device/driver_config_test.py @@ -0,0 +1,77 @@ +from contextlib import nullcontext + +import pytest +from pydantic import ValidationError + +from .driver_config import DriverConfig, I2cDriverConfig + +VALID_DRIVER_MODULE = __name__ +"""For a driver module to be valid, it must be importable. Everything else +is checked else where.""" + + +class TestDriverConfig: + + @pytest.mark.parametrize( + "driver_module, expected", + [ + pytest.param(VALID_DRIVER_MODULE, VALID_DRIVER_MODULE, id="valid module"), + pytest.param("non_existing_module", ValueError, id="invalid module"), + ], + ) + def test_driver_module_validation( + self, driver_module: str, expected: str | type[Exception] + ): + """This function ensures that the driver module is valid.""" + + if isinstance(expected, str): + context = nullcontext() + else: + context = pytest.raises(expected) + + with context: + config = DriverConfig( + identifier="does-not-matter", driver_module=driver_module + ) + + assert config.driver_module == expected + + +class TestI2cDriverConfig: + + @pytest.mark.parametrize( + "address, expected", + [ + pytest.param("0x03", "0x03", id="string: minimum address"), + pytest.param("0x77", "0x77", id="string: maximum address"), + pytest.param("0x00", ValidationError, id="string: below minimum address"), + pytest.param("0x78", ValidationError, id="string: above maximum address"), + pytest.param("1e", "0x1e", id="string: valid address without 0x"), + pytest.param("0xij", ValidationError, id="string: invalid hex"), + pytest.param("0x0A", "0x0a", id="string: upper case hex"), + pytest.param(0x03, "0x03", id="int: minimum address"), + pytest.param(0x77, "0x77", id="int: maximum address"), + pytest.param(0x00, ValidationError, id="int: below minimum address"), + pytest.param(0x78, ValidationError, id="int: above maximum address"), + ], + ) + def test_address_validation( + self, address: str | int, expected: str | type[Exception] + ): + """This function ensures that the address is valid.""" + + if isinstance(expected, str): + context = nullcontext() + else: + context = pytest.raises(expected) + + with context: + config = I2cDriverConfig( + identifier="does-not-matter", + driver_module=VALID_DRIVER_MODULE, + direction="input", # does not matter for this test + address=address, + ) + + assert config.address == expected + assert config.address_int == int(config.address, 16) diff --git a/lib/py_edge_interface/carlos/edge/interface/device/driver_test.py b/lib/py_edge_interface/carlos/edge/interface/device/driver_test.py new file mode 100644 index 00000000..9301036e --- /dev/null +++ b/lib/py_edge_interface/carlos/edge/interface/device/driver_test.py @@ -0,0 +1,250 @@ +from contextlib import nullcontext +from secrets import token_hex +from typing import Self + +import pytest +from pydantic import BaseModel + +from .driver import ( + AnalogInput, + CarlosDriver, + DigitalOutput, + DriverFactory, + validate_device_address_space, +) +from .driver_config import GpioDriverConfig, I2cDriverConfig + +DRIVER_MODULE = __name__ + +ANALOG_INPUT_VALUE = {"value": 0.0} +ANALOG_INPUT_CONFIG = GpioDriverConfig( + identifier="analog-input-test", + driver_module=DRIVER_MODULE, + direction="input", + pin=13, +) +DIGITAL_OUTPUT_CONFIG = GpioDriverConfig( + identifier="digital-output-test", + driver_module=DRIVER_MODULE, + direction="output", + pin=14, +) + + +class AnalogInputTest(AnalogInput): + + def setup(self): + pass + + def read(self) -> dict[str, float]: + return ANALOG_INPUT_VALUE + + +def test_carlos_driver_base(): + """This test test the methods of the CarlosDriverBase class via the + AnalogInputTest class.""" + + driver = AnalogInputTest(config=ANALOG_INPUT_CONFIG) + + assert isinstance(str(driver), str), "Could not convert driver to string." + assert ( + driver.identifier == ANALOG_INPUT_CONFIG.identifier + ), "Identifier should be the same as in the config." + + +def test_analog_input(): + """This test tests the AnalogInput Interface bia the AnalogInputTest class.""" + analog_input = AnalogInputTest(config=ANALOG_INPUT_CONFIG) + + assert ( + analog_input.test() == ANALOG_INPUT_VALUE + ), "Test function should return a reading." + + # using output config for input should raise an error + with pytest.raises(ValueError): + AnalogInputTest(config=DIGITAL_OUTPUT_CONFIG) + + +@pytest.mark.asyncio +async def test_async_analog_input(): + """This test tests the AnalogInput Interface bia the AnalogInputTest class.""" + analog_input = AnalogInputTest(config=ANALOG_INPUT_CONFIG) + + assert ( + await analog_input.read_async() == ANALOG_INPUT_VALUE + ), "Test function should return a reading." + + +class DigitalOutputTest(DigitalOutput): + + def setup(self) -> Self: + self.pytest_state = None + return self + + def set(self, value: bool): + self.pytest_state = value + + +def test_digital_output(): + """This test tests the DigitalOutput Interface bia the DigitalOutputTest class.""" + digital_output = DigitalOutputTest(config=DIGITAL_OUTPUT_CONFIG).setup() + + assert digital_output.pytest_state is None, "Initial state should be None." + + digital_output.test() + + assert ( + digital_output.pytest_state is not None + ), "State should be set to a value after running the test." + + # using input config for output should raise an error + with pytest.raises(ValueError): + DigitalOutputTest(config=ANALOG_INPUT_CONFIG) + + +def test_driver_factory(): + """This test tests the driver factory function.""" + + factory = DriverFactory() + + raw_analog_input_config = ANALOG_INPUT_CONFIG.model_dump(mode="json") + + # Sofar the AnalogInputTest class was never registered, thus the build method + # should raise a RuntimeError. + with pytest.raises(RuntimeError): + factory.build(raw_analog_input_config) + + factory.register( + driver_module=DRIVER_MODULE, config=GpioDriverConfig, factory=AnalogInputTest + ) + + # Trying to register a driver with the same driver_module should raise a RuntimeError. + with pytest.raises(RuntimeError): + factory.register( + driver_module=DRIVER_MODULE, + config=GpioDriverConfig, + factory=AnalogInputTest, + ) + + # Passing a non DriverConfig type as config should raise a ValueError. + with pytest.raises(ValueError): + factory.register( + driver_module=DRIVER_MODULE + ".test", + config=BaseModel, + factory=DigitalOutputTest, + ) + + driver = factory.build(raw_analog_input_config) + assert isinstance( + driver, AnalogInputTest + ), "Driver should be an instance of AnalogInputTest." + assert isinstance( + driver.config, GpioDriverConfig + ), "Config should be an instance of GpioDriverConfig." + + +@pytest.mark.parametrize( + "drivers, expected_exception", + [ + pytest.param([AnalogInputTest(ANALOG_INPUT_CONFIG)], None, id="valid-single"), + pytest.param( + [ + AnalogInputTest(ANALOG_INPUT_CONFIG), + DigitalOutputTest(DIGITAL_OUTPUT_CONFIG), + ], + None, + id="valid-multiple", + ), + pytest.param( + [ + AnalogInputTest( + GpioDriverConfig( + identifier="test", + pin=2, + direction="input", + driver_module=DRIVER_MODULE, + ) + ) + ], + None, + id="valid-i2c-ping-used", + ), + pytest.param( + [ + AnalogInputTest(ANALOG_INPUT_CONFIG), + AnalogInputTest(ANALOG_INPUT_CONFIG), + ], + ValueError, + id="duplicate-identifier", + ), + pytest.param( + [ + AnalogInputTest(ANALOG_INPUT_CONFIG), + AnalogInputTest( + ANALOG_INPUT_CONFIG.model_copy( + update={"identifier": "new-identifier"} + ) + ), + ], + ValueError, + id="duplicate-identifier-gpio-pin", + ), + pytest.param( + [ + AnalogInputTest( + I2cDriverConfig( + identifier=token_hex(4), + driver_module=DRIVER_MODULE, + direction="input", + address="0x04", + ) + ), + AnalogInputTest( + GpioDriverConfig( + identifier="test", + pin=2, + direction="input", + driver_module=DRIVER_MODULE, + ) + ), + ], + ValueError, + id="i2c-pin-used", + ), + pytest.param( + [ + AnalogInputTest( + I2cDriverConfig( + identifier=token_hex(4), + driver_module=DRIVER_MODULE, + direction="input", + address="0x04", + ) + ), + AnalogInputTest( + I2cDriverConfig( + identifier=token_hex(4), + driver_module=DRIVER_MODULE, + direction="input", + address="0x04", + ) + ), + ], + ValueError, + id="duplicate-i2c-address", + ), + ], +) +def test_validate_device_address_space( + drivers: list[CarlosDriver], expected_exception: type[Exception] | None +): + """This method ensures that the validate_device_address_space() works + as expected.""" + + if expected_exception is not None: + context = pytest.raises(expected_exception) + else: + context = nullcontext() + + with context: + validate_device_address_space(drivers) diff --git a/lib/py_edge_interface/pyproject.toml b/lib/py_edge_interface/pyproject.toml index f52f7c76..e3b3746f 100644 --- a/lib/py_edge_interface/pyproject.toml +++ b/lib/py_edge_interface/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "carlos.edge.interface" -version = "0.1.0" +version = "0.1.1" description = "Shared library to handle the edge communication." authors = ["Felix Fanghanel"] license = "MIT" @@ -23,7 +23,7 @@ requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" [tool.bumpversion] -current_version = "0.1.0" +current_version = "0.1.1" commit = true tag = false parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)(\\-(?P[a-z0-9\\.]+))?" diff --git a/services/device/device/cli/config.py b/services/device/device/cli/config.py index 70946016..b634b6f4 100644 --- a/services/device/device/cli/config.py +++ b/services/device/device/cli/config.py @@ -3,9 +3,17 @@ from typing import TypeVar import typer +from carlos.edge.device.runtime import DriverManager from pydantic import BaseModel from pydantic_core import PydanticUndefinedType from rich import print, print_json +from rich.console import Console, Group +from rich.live import Live +from rich.panel import Panel +from rich.pretty import Pretty +from rich.rule import Rule +from rich.spinner import Spinner +from rich.traceback import Traceback from device.connection import ( ConnectionSettings, @@ -13,6 +21,8 @@ write_connection_settings, ) +console = Console() + config_cli = typer.Typer() @@ -54,3 +64,47 @@ def show(): print("\n[bold]Connection[/bold] configuration:") print_json(read_connection_settings().model_dump_json()) + + +@config_cli.command() +def test(): # pragma: no cover + """Tests the io peripherals.""" + + driver_result_ui = [] + failed = [] + passed_cnt = 0 + with Live(Group(), refresh_per_second=4) as live: + for driver in DriverManager().setup().drivers: + driver_result_ui.append( + Panel( + Spinner(name="aesthetic", text="testing..."), + padding=(1, 2), + title=str(driver), + title_align="left", + subtitle="testing", + subtitle_align="right", + ) + ) + live.update(Group(*driver_result_ui)) + + try: + result = driver.test() + driver_result_ui[-1].renderable = ( + Pretty(result) if result else "[green]passed[/green]" + ) + driver_result_ui[-1].subtitle = "[green]passed[/green]" + passed_cnt += 1 + except Exception as e: + failed.append(driver) + driver_result_ui[-1].renderable = Traceback.from_exception( + type(e), e, e.__traceback__ + ) + driver_result_ui[-1].subtitle = "[red]failed[/red]" + + conclusions = [] + if passed_cnt > 0: + conclusions.append(f"[green]{passed_cnt} passed[/green]") + if failed: + conclusions.append(f"[red]{len(failed)} failed[/red]") + + live.update(Group(*driver_result_ui, Rule(", ".join(conclusions)))) diff --git a/services/device/device/run.py b/services/device/device/run.py index 12f6cde4..37fede57 100644 --- a/services/device/device/run.py +++ b/services/device/device/run.py @@ -1,4 +1,4 @@ -from carlos.edge.device import DeviceRuntime, read_config +from carlos.edge.device import DeviceRuntime from carlos.edge.device.constants import VERSION from loguru import logger @@ -24,12 +24,11 @@ async def main(): # pragma: no cover logger.info(f"Starting Carlos device (v{VERSION})...") - device_config = read_config() device_connection = read_connection_settings() protocol = DeviceWebsocketClient(settings=device_connection) runtime = DeviceRuntime( - config=device_config, + device_id=device_connection.device_id, protocol=protocol, ) await runtime.run() diff --git a/services/device/poetry.lock b/services/device/poetry.lock index 80e56863..274ce7a3 100644 --- a/services/device/poetry.lock +++ b/services/device/poetry.lock @@ -159,9 +159,12 @@ develop = false apscheduler = "^4.0.0a4" "carlos.edge.interface" = {path = "../py_edge_interface"} loguru = "^0.7.2" +psutil = "^5.9.8" pydantic = "^2.6.4" pyyaml = "^6.0.1" +rpi-gpio = {version = "^0.7.1", markers = "platform_machine == \"armv7l\" or platform_machine == \"aarch64\""} semver = "^3.0.2" +smbus2 = "^0.4.3" [package.source] type = "directory" @@ -789,6 +792,34 @@ files = [ [package.dependencies] wcwidth = "*" +[[package]] +name = "psutil" +version = "5.9.8" +description = "Cross-platform lib for process and system monitoring in Python." +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" +files = [ + {file = "psutil-5.9.8-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:26bd09967ae00920df88e0352a91cff1a78f8d69b3ecabbfe733610c0af486c8"}, + {file = "psutil-5.9.8-cp27-cp27m-manylinux2010_i686.whl", hash = "sha256:05806de88103b25903dff19bb6692bd2e714ccf9e668d050d144012055cbca73"}, + {file = "psutil-5.9.8-cp27-cp27m-manylinux2010_x86_64.whl", hash = "sha256:611052c4bc70432ec770d5d54f64206aa7203a101ec273a0cd82418c86503bb7"}, + {file = "psutil-5.9.8-cp27-cp27mu-manylinux2010_i686.whl", hash = "sha256:50187900d73c1381ba1454cf40308c2bf6f34268518b3f36a9b663ca87e65e36"}, + {file = "psutil-5.9.8-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:02615ed8c5ea222323408ceba16c60e99c3f91639b07da6373fb7e6539abc56d"}, + {file = "psutil-5.9.8-cp27-none-win32.whl", hash = "sha256:36f435891adb138ed3c9e58c6af3e2e6ca9ac2f365efe1f9cfef2794e6c93b4e"}, + {file = "psutil-5.9.8-cp27-none-win_amd64.whl", hash = "sha256:bd1184ceb3f87651a67b2708d4c3338e9b10c5df903f2e3776b62303b26cb631"}, + {file = "psutil-5.9.8-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:aee678c8720623dc456fa20659af736241f575d79429a0e5e9cf88ae0605cc81"}, + {file = "psutil-5.9.8-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8cb6403ce6d8e047495a701dc7c5bd788add903f8986d523e3e20b98b733e421"}, + {file = "psutil-5.9.8-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d06016f7f8625a1825ba3732081d77c94589dca78b7a3fc072194851e88461a4"}, + {file = "psutil-5.9.8-cp36-cp36m-win32.whl", hash = "sha256:7d79560ad97af658a0f6adfef8b834b53f64746d45b403f225b85c5c2c140eee"}, + {file = "psutil-5.9.8-cp36-cp36m-win_amd64.whl", hash = "sha256:27cc40c3493bb10de1be4b3f07cae4c010ce715290a5be22b98493509c6299e2"}, + {file = "psutil-5.9.8-cp37-abi3-win32.whl", hash = "sha256:bc56c2a1b0d15aa3eaa5a60c9f3f8e3e565303b465dbf57a1b730e7a2b9844e0"}, + {file = "psutil-5.9.8-cp37-abi3-win_amd64.whl", hash = "sha256:8db4c1b57507eef143a15a6884ca10f7c73876cdf5d51e713151c1236a0e68cf"}, + {file = "psutil-5.9.8-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:d16bbddf0693323b8c6123dd804100241da461e41d6e332fb0ba6058f630f8c8"}, + {file = "psutil-5.9.8.tar.gz", hash = "sha256:6be126e3225486dff286a8fb9a06246a5253f4c7c53b475ea5f5ac934e64194c"}, +] + +[package.extras] +test = ["enum34", "ipaddress", "mock", "pywin32", "wmi"] + [[package]] name = "pydantic" version = "2.7.0" @@ -1158,6 +1189,21 @@ typing-extensions = "*" [package.extras] dev = ["flake8", "flake8-docstrings", "mypy", "packaging", "pre-commit", "pytest", "pytest-cov", "types-setuptools"] +[[package]] +name = "rpi-gpio" +version = "0.7.1" +description = "A module to control Raspberry Pi GPIO channels" +optional = false +python-versions = "*" +files = [ + {file = "RPi.GPIO-0.7.1-cp27-cp27mu-linux_armv6l.whl", hash = "sha256:b86b66dc02faa5461b443a1e1f0c1d209d64ab5229696f32fb3b0215e0600c8c"}, + {file = "RPi.GPIO-0.7.1-cp310-cp310-linux_armv6l.whl", hash = "sha256:57b6c044ef5375a78c8dda27cdfadf329e76aa6943cd6cffbbbd345a9adf9ca5"}, + {file = "RPi.GPIO-0.7.1-cp37-cp37m-linux_armv6l.whl", hash = "sha256:77afb817b81331ce3049a4b8f94a85e41b7c404d8e56b61ac0f1eb75c3120868"}, + {file = "RPi.GPIO-0.7.1-cp38-cp38-linux_armv6l.whl", hash = "sha256:29226823da8b5ccb9001d795a944f2e00924eeae583490f0bc7317581172c624"}, + {file = "RPi.GPIO-0.7.1-cp39-cp39-linux_armv6l.whl", hash = "sha256:15311d3b063b71dee738cd26570effc9985a952454d162937c34e08c0fc99902"}, + {file = "RPi.GPIO-0.7.1.tar.gz", hash = "sha256:cd61c4b03c37b62bba4a5acfea9862749c33c618e0295e7e90aa4713fb373b70"}, +] + [[package]] name = "ruff" version = "0.3.7" @@ -1206,6 +1252,22 @@ files = [ {file = "shellingham-1.5.4.tar.gz", hash = "sha256:8dbca0739d487e5bd35ab3ca4b36e11c4078f3a234bfce294b0a0291363404de"}, ] +[[package]] +name = "smbus2" +version = "0.4.3" +description = "smbus2 is a drop-in replacement for smbus-cffi/smbus-python in pure Python" +optional = false +python-versions = "*" +files = [ + {file = "smbus2-0.4.3-py2.py3-none-any.whl", hash = "sha256:a2fc29cfda4081ead2ed61ef2c4fc041d71dd40a8d917e85216f44786fca2d1d"}, + {file = "smbus2-0.4.3.tar.gz", hash = "sha256:36f2288a8e1a363cb7a7b2244ec98d880eb5a728a2494ac9c71e9de7bf6a803a"}, +] + +[package.extras] +docs = ["sphinx (>=1.5.3)"] +qa = ["flake8"] +test = ["mock", "nose"] + [[package]] name = "sniffio" version = "1.3.1"