diff --git a/lib/py_carlos_database/.idea/py_carlos_database.iml b/lib/py_carlos_database/.idea/py_carlos_database.iml
new file mode 100644
index 00000000..25e55379
--- /dev/null
+++ b/lib/py_carlos_database/.idea/py_carlos_database.iml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/lib/py_edge_device/carlos/edge/device/__init__.py b/lib/py_edge_device/carlos/edge/device/__init__.py
index a2f3875c..2d2b455c 100644
--- a/lib/py_edge_device/carlos/edge/device/__init__.py
+++ b/lib/py_edge_device/carlos/edge/device/__init__.py
@@ -1,4 +1,5 @@
-__all__ = ["DeviceRuntime", "read_config", "DeviceConfig"]
+__all__ = [
+ "DeviceRuntime",
+]
-from .config import DeviceConfig, read_config
from .runtime import DeviceRuntime
diff --git a/lib/py_edge_device/carlos/edge/device/config.py b/lib/py_edge_device/carlos/edge/device/config.py
index 945be2c9..bfbd8229 100644
--- a/lib/py_edge_device/carlos/edge/device/config.py
+++ b/lib/py_edge_device/carlos/edge/device/config.py
@@ -2,10 +2,8 @@
configuration of the application."""
__all__ = [
- "DeviceConfig",
- "read_config",
+ "load_drivers",
"read_config_file",
- "write_config",
"write_config_file",
]
@@ -13,17 +11,12 @@
from typing import TypeVar
import yaml
-from carlos.edge.interface import DeviceId
-from pydantic import BaseModel, Field
+from carlos.edge.interface.device import CarlosDriver, DriverConfig, DriverFactory
+from loguru import logger
+from pydantic import BaseModel
from carlos.edge.device.constants import CONFIG_FILE_NAME
-
-
-class DeviceConfig(BaseModel):
- """Configures the pure device settings."""
-
- device_id: DeviceId = Field(..., description="The unique identifier of the device.")
-
+from carlos.edge.device.driver.device_metrics import DeviceMetrics
Config = TypeVar("Config", bound=BaseModel)
@@ -50,15 +43,38 @@ def write_config_file(path: Path, config: Config):
)
-def read_config() -> DeviceConfig: # pragma: no cover
+def load_drivers(config_dir: Path | None = None) -> list[CarlosDriver]:
"""Reads the configuration from the default location."""
+ config_dir = config_dir or Path.cwd()
- return read_config_file(
- path=Path.cwd() / CONFIG_FILE_NAME,
- schema=DeviceConfig,
- )
+ with open(config_dir / CONFIG_FILE_NAME, "r") as file:
+ raw_config = yaml.safe_load(file)
+
+ factory = DriverFactory()
+ try:
+ driver_configs = [factory.build(config) for config in raw_config["drivers"]]
+ except KeyError: # pragma: no cover
+ raise KeyError(
+ f"The configuration file {config_dir / CONFIG_FILE_NAME} must contain "
+ f"a 'drivers' key."
+ )
+
+ # We always want to have some device metrics
+ if not any(isinstance(driver, DeviceMetrics) for driver in driver_configs):
+ driver_configs.insert(
+ 0,
+ factory.build(
+ DriverConfig(
+ identifier="__device_metrics__",
+ driver_module=DeviceMetrics.__module__,
+ ).model_dump()
+ ),
+ )
+
+ logger.info(
+ f"Loaded {len(driver_configs)} IOs: "
+ f"{', '.join(str(io) for io in driver_configs)}"
+ )
-def write_config(config: DeviceConfig): # pragma: no cover
- """Writes the configuration to the default location."""
- write_config_file(path=Path.cwd() / CONFIG_FILE_NAME, config=config)
+ return driver_configs
diff --git a/lib/py_edge_device/carlos/edge/device/config_test.py b/lib/py_edge_device/carlos/edge/device/config_test.py
index 69883af8..2883bad7 100644
--- a/lib/py_edge_device/carlos/edge/device/config_test.py
+++ b/lib/py_edge_device/carlos/edge/device/config_test.py
@@ -1,9 +1,10 @@
from pathlib import Path
-from uuid import uuid4
import pytest
+from carlos.edge.interface.device import AnalogInput, DigitalOutput, GpioDriverConfig
-from carlos.edge.device.config import DeviceConfig, read_config_file, write_config_file
+from carlos.edge.device.config import load_drivers, read_config_file, write_config_file
+from tests.test_data import EXPECTED_IO_COUNT, TEST_DEVICE_WORKDIR
def test_config_file_io(tmp_path: Path):
@@ -12,10 +13,27 @@ def test_config_file_io(tmp_path: Path):
cfg_path = tmp_path / "config"
with pytest.raises(FileNotFoundError):
- read_config_file(cfg_path, DeviceConfig)
+ read_config_file(cfg_path, GpioDriverConfig)
- config = DeviceConfig(device_id=uuid4())
+ config = GpioDriverConfig(
+ identifier="test-config-file-io",
+ driver_module="carlos.edge.device.driver.dht11",
+ direction="input",
+ pin=7,
+ )
write_config_file(cfg_path, config)
- assert read_config_file(cfg_path, DeviceConfig) == config
+ assert read_config_file(cfg_path, GpioDriverConfig) == config
+
+
+def test_load_io():
+ """This test ensures that the IOs are loaded correctly."""
+
+ io = load_drivers(config_dir=TEST_DEVICE_WORKDIR)
+
+ assert len(io) == EXPECTED_IO_COUNT, "The number of IOs does not match."
+
+ assert all(
+ isinstance(io, (AnalogInput, DigitalOutput)) for io in io
+ ), "Not all IOs are of the correct type."
diff --git a/lib/py_edge_device/carlos/edge/device/driver/__init__.py b/lib/py_edge_device/carlos/edge/device/driver/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/lib/py_edge_device/carlos/edge/device/driver/_dhtxx.py b/lib/py_edge_device/carlos/edge/device/driver/_dhtxx.py
new file mode 100644
index 00000000..38fce119
--- /dev/null
+++ b/lib/py_edge_device/carlos/edge/device/driver/_dhtxx.py
@@ -0,0 +1,159 @@
+__all__ = ["DHTXX", "DhtConfig", "DHTType"]
+
+from abc import ABC
+from enum import StrEnum
+from time import sleep
+from typing import Literal
+
+from carlos.edge.interface.device import AnalogInput, GpioDriverConfig
+from pydantic import Field
+
+from carlos.edge.device.protocol import GPIO
+
+
+class DhtConfig(GpioDriverConfig):
+ """Configuration for a DHT sensor."""
+
+ direction: Literal["input"] = Field("input")
+
+
+class DHTType(StrEnum):
+ DHT11 = "DHT11"
+ DHT22 = "DHT22"
+
+
+class DHT:
+ """Code for Temperature & Humidity Sensor of Seeed Studio.
+
+ Code is originally from, but modified to my needs:
+ http://wiki.seeedstudio.com/Grove-TemperatureAndHumidity_Sensor/
+ """
+
+ PULSES_CNT = 41
+
+ MAX_CNT = 320
+
+ def __init__(self, dht_type: DHTType, pin: int):
+ """
+
+ :param dht_type: either DHTtype.DHT11 or DHTtype.22
+ :param pin: gpio pin where the sensor is connected to
+ """
+
+ # store the pin and type
+ self._pin = pin
+ self._dht_type = dht_type
+
+ GPIO.setup(self._pin, GPIO.OUT)
+
+ def read(self) -> tuple[float, float]:
+ """Internal read method.
+
+ http://www.ocfreaks.com/basics-interfacing-dht11-dht22-humidity-temperature-sensor-mcu/
+
+ :returns (humidity in %, temperature in °C)"""
+
+ # Send Falling signal to trigger sensor output data
+ # Wait for 20ms to collect 42 bytes data
+ GPIO.setup(self._pin, GPIO.OUT)
+ GPIO.output(self._pin, GPIO.HIGH)
+ sleep(0.2)
+ GPIO.output(self._pin, GPIO.LOW)
+ sleep(0.018)
+
+ GPIO.setup(self._pin, GPIO.IN)
+
+ # a short delay needed
+ for _ in range(10):
+ pass
+
+ # pullup by host 20-40 us
+ count = 0
+ while GPIO.input(self._pin):
+ count += 1
+ if count > self.MAX_CNT:
+ raise RuntimeError("pullup by host 20-40us failed")
+
+ pulse_cnt = [0] * (2 * self.PULSES_CNT)
+ for pulse in range(0, self.PULSES_CNT * 2, 2):
+ while not GPIO.input(self._pin):
+ pulse_cnt[pulse] += 1
+ if pulse_cnt[pulse] > self.MAX_CNT:
+ raise RuntimeError(f"pulldown by DHT timeout: {pulse}")
+
+ while GPIO.input(self._pin):
+ pulse_cnt[pulse + 1] += 1
+ if pulse_cnt[pulse + 1] > self.MAX_CNT:
+ if pulse == (self.PULSES_CNT - 1) * 2:
+ pass
+ raise RuntimeError(f"pullup by DHT timeout: {pulse}")
+
+ total_cnt = 0
+ for pulse in range(2, 2 * self.PULSES_CNT, 2):
+ total_cnt += pulse_cnt[pulse]
+
+ # Low level (50 us) average counter
+ average_cnt = total_cnt / (self.PULSES_CNT - 1)
+
+ data = ""
+ for pulse in range(3, 2 * self.PULSES_CNT, 2):
+ if pulse_cnt[pulse] > average_cnt:
+ data += "1"
+ else:
+ data += "0"
+
+ byte0 = int(data[0:8], 2)
+ byte1 = int(data[8:16], 2)
+ byte2 = int(data[16:24], 2)
+ byte3 = int(data[24:32], 2)
+ crc_byte = int(data[32:40], 2)
+
+ data_checksum = (byte0 + byte1 + byte2 + byte3) & 0xFF
+ if crc_byte != data_checksum:
+ raise RuntimeError("checksum error!")
+
+ if self._dht_type == DHTType.DHT11:
+ humidity = float(byte0)
+ temperature = float(byte2)
+ else:
+ humidity = float(int(data[0:16], 2) * 0.1)
+ temperature = float(int(data[17:32], 2) * 0.2 * (0.5 - int(data[16], 2)))
+
+ return temperature, humidity
+
+
+class DHTXX(AnalogInput, ABC):
+ """DHTXX Temperature and Humidity Sensor."""
+
+ def __init__(self, config: GpioDriverConfig):
+
+ super().__init__(config=config)
+
+ self._dht: DHT | None = None
+ self._dht_type: DHTType | None = None
+
+ def setup(self):
+ """Sets up the DHT11 sensor."""
+
+ self._dht = DHT(dht_type=self._dht_type, pin=self.config.pin)
+
+ def read(self) -> dict[str, float]:
+ """Reads the temperature and humidity."""
+
+ assert self._dht is not None, "The DHT sensor has not been initialized."
+
+ # Reading the DHT sensor is quite unreliable, as the device is not a real-time
+ # device. Thus, we just try it a couple of times and fail if it does not work.
+ last_error: Exception | None = None
+ for i in range(16):
+ try:
+ temperature, humidity = self._dht.read()
+ return {
+ "temperature": temperature,
+ "humidity": humidity,
+ }
+ except RuntimeError as ex:
+ last_error = ex
+
+ assert last_error is not None
+ raise last_error
diff --git a/lib/py_edge_device/carlos/edge/device/driver/device_metrics.py b/lib/py_edge_device/carlos/edge/device/driver/device_metrics.py
new file mode 100644
index 00000000..5d8b2a06
--- /dev/null
+++ b/lib/py_edge_device/carlos/edge/device/driver/device_metrics.py
@@ -0,0 +1,37 @@
+import psutil
+from carlos.edge.interface.device import AnalogInput, DriverConfig, DriverFactory
+
+
+class DeviceMetrics(AnalogInput):
+ """Provides the metrics of the device."""
+
+ def __init__(self, config: DriverConfig):
+
+ super().__init__(config=config)
+
+ def setup(self):
+ pass
+
+ def read(self) -> dict[str, float]:
+ """Reads the device metrics."""
+
+ return {
+ "cpu.load_percent": psutil.cpu_percent(interval=1.0),
+ "cpu.temperature": self._read_cpu_temp(),
+ "memory.usage_percent": psutil.virtual_memory().percent,
+ "disk.usage_percent": psutil.disk_usage("/").percent,
+ }
+
+ @staticmethod
+ def _read_cpu_temp() -> float:
+ """Reads the CPU temperature."""
+ try:
+ with open("/sys/class/thermal/thermal_zone0/temp") as f:
+ return float(f.read().strip()) / 1000
+ except FileNotFoundError:
+ return 0.0
+
+
+DriverFactory().register(
+ driver_module=__name__, config=DriverConfig, factory=DeviceMetrics
+)
diff --git a/lib/py_edge_device/carlos/edge/device/driver/dht11.py b/lib/py_edge_device/carlos/edge/device/driver/dht11.py
new file mode 100644
index 00000000..d91d10ac
--- /dev/null
+++ b/lib/py_edge_device/carlos/edge/device/driver/dht11.py
@@ -0,0 +1,16 @@
+from carlos.edge.interface.device import DriverFactory, GpioDriverConfig
+
+from ._dhtxx import DHTXX, DhtConfig, DHTType
+
+
+class DHT11(DHTXX):
+ """DHT11 Temperature and Humidity Sensor."""
+
+ def __init__(self, config: GpioDriverConfig):
+
+ super().__init__(config=config)
+
+ self._dht_type = DHTType.DHT11
+
+
+DriverFactory().register(driver_module=__name__, config=DhtConfig, factory=DHT11)
diff --git a/lib/py_edge_device/carlos/edge/device/driver/dht22.py b/lib/py_edge_device/carlos/edge/device/driver/dht22.py
new file mode 100644
index 00000000..f90c7a1a
--- /dev/null
+++ b/lib/py_edge_device/carlos/edge/device/driver/dht22.py
@@ -0,0 +1,16 @@
+from carlos.edge.interface.device import DriverFactory, GpioDriverConfig
+
+from ._dhtxx import DHTXX, DhtConfig, DHTType
+
+
+class DHT22(DHTXX):
+ """DHT22 Temperature and Humidity Sensor."""
+
+ def __init__(self, config: GpioDriverConfig):
+
+ super().__init__(config=config)
+
+ self._dht_type = DHTType.DHT22
+
+
+DriverFactory().register(driver_module=__name__, config=DhtConfig, factory=DHT22)
diff --git a/lib/py_edge_device/carlos/edge/device/driver/relay.py b/lib/py_edge_device/carlos/edge/device/driver/relay.py
new file mode 100644
index 00000000..a372ebd1
--- /dev/null
+++ b/lib/py_edge_device/carlos/edge/device/driver/relay.py
@@ -0,0 +1,28 @@
+from typing import Literal
+
+from carlos.edge.interface.device import DigitalOutput, DriverFactory, GpioDriverConfig
+from pydantic import Field
+
+from carlos.edge.device.protocol import GPIO
+
+
+class RelayConfig(GpioDriverConfig):
+
+ direction: Literal["output"] = Field("output")
+
+
+class Relay(DigitalOutput):
+ """Relay."""
+
+ def __init__(self, config: RelayConfig):
+ super().__init__(config=config)
+
+ def setup(self):
+ GPIO.setup(self.config.pin, GPIO.OUT, initial=GPIO.LOW)
+
+ def set(self, value: bool):
+ """Writes the value to the relay."""
+ GPIO.output(self.config.pin, value)
+
+
+DriverFactory().register(driver_module=__name__, config=RelayConfig, factory=Relay)
diff --git a/lib/py_edge_device/carlos/edge/device/driver/si1145.py b/lib/py_edge_device/carlos/edge/device/driver/si1145.py
new file mode 100644
index 00000000..ee4c00e9
--- /dev/null
+++ b/lib/py_edge_device/carlos/edge/device/driver/si1145.py
@@ -0,0 +1,500 @@
+import time
+from typing import Literal
+
+from carlos.edge.interface.device import AnalogInput, DriverFactory, I2cDriverConfig
+from pydantic import Field
+
+from carlos.edge.device.protocol import I2C
+
+
+class Si1145Config(I2cDriverConfig):
+
+ direction: Literal["input"] = Field("input")
+
+ address: Literal["0x60"] = Field("0x60")
+
+
+class SI1145(AnalogInput):
+
+ def __init__(self, config: Si1145Config):
+
+ if config.address_int != SDL_Pi_SI1145.ADDR:
+ raise ValueError(
+ f"The address of the SI1145 sensor must be 0x60. Got {config.address} instead"
+ )
+
+ super().__init__(config=config)
+
+ self._si1145: SDL_Pi_SI1145 | None = None
+
+ def setup(self):
+
+ self._si1145 = SDL_Pi_SI1145()
+
+ def read(self) -> dict[str, float]:
+ """Reads various light levels from the sensor."""
+
+ assert self._si1145 is not None, "The sensor has not been set up."
+
+ vis_raw = self._si1145.read_visible()
+ vis_lux = self._si1145.convert_visible_to_lux(vis_raw)
+ ir_raw = self._si1145.read_ir()
+ ir_lux = self._si1145.convert_ir_to_lux(ir_raw)
+ uv_idx = self._si1145.read_uv_index()
+
+ return {
+ "visual-light-raw": float(vis_raw),
+ "visual-light": float(vis_lux),
+ "infrared-light-raw": float(ir_raw),
+ "infrared-light": float(ir_lux),
+ "uv-index": float(uv_idx),
+ }
+
+
+DriverFactory().register(driver_module=__name__, config=Si1145Config, factory=SI1145)
+
+
+class SDL_Pi_SI1145:
+ """Interface to the SI1145 UV Light Sensor by Adafruit.
+
+ Note: This sensor has a static I2C Address of 0x60.
+
+ https://www.silabs.com/documents/public/data-sheets/Si1145-46-47.pdf
+
+ Modified for medium range vis, IR SDL December 2016 and
+ non Adafruit I2C (interfers with others)
+
+ Original Author: Joe Gutting
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"), to deal
+ in the Software without restriction, including without limitation the rights
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in
+ all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ THE SOFTWARE.
+ """
+
+ # COMMANDS
+ PARAM_QUERY = 0x80
+ PARAM_SET = 0xA0
+ NOP = 0x0
+ RESET = 0x01
+ BUSADDR = 0x02
+ PS_FORCE = 0x05
+ ALS_FORCE = 0x06
+ PSALS_FORCE = 0x07
+ PS_PAUSE = 0x09
+ ALS_PAUSE = 0x0A
+ PSALS_PAUSE = 0xB
+ PS_AUTO = 0x0D
+ ALS_AUTO = 0x0E
+ PSALS_AUTO = 0x0F
+ GET_CAL = 0x12
+
+ # Parameters
+ PARAM_I2CADDR = 0x00
+ PARAM_CHLIST = 0x01
+ PARAM_CHLIST_ENUV = 0x80
+ PARAM_CHLIST_ENAUX = 0x40
+ PARAM_CHLIST_ENALSIR = 0x20
+ PARAM_CHLIST_ENALSVIS = 0x10
+ PARAM_CHLIST_ENPS1 = 0x01
+ PARAM_CHLIST_ENPS2 = 0x02
+ PARAM_CHLIST_ENPS3 = 0x04
+
+ PARAM_PSLED12SEL = 0x02
+ PARAM_PSLED12SEL_PS2NONE = 0x00
+ PARAM_PSLED12SEL_PS2LED1 = 0x10
+ PARAM_PSLED12SEL_PS2LED2 = 0x20
+ PARAM_PSLED12SEL_PS2LED3 = 0x40
+ PARAM_PSLED12SEL_PS1NONE = 0x00
+ PARAM_PSLED12SEL_PS1LED1 = 0x01
+ PARAM_PSLED12SEL_PS1LED2 = 0x02
+ PARAM_PSLED12SEL_PS1LED3 = 0x04
+
+ PARAM_PSLED3SEL = 0x03
+ PARAM_PSENCODE = 0x05
+ PARAM_ALSENCODE = 0x06
+
+ PARAM_PS1ADCMUX = 0x07
+ PARAM_PS2ADCMUX = 0x08
+ PARAM_PS3ADCMUX = 0x09
+ PARAM_PSADCOUNTER = 0x0A
+ PARAM_PSADCGAIN = 0x0B
+ PARAM_PSADCMISC = 0x0C
+ PARAM_PSADCMISC_RANGE = 0x20
+ PARAM_PSADCMISC_PSMODE = 0x04
+
+ PARAM_ALSIRADCMUX = 0x0E
+ PARAM_AUXADCMUX = 0x0F
+
+ PARAM_ALS_VIS_ADC_COUNTER = 0x10
+ PARAM_ALS_VIS_ADC_GAIN = 0x11
+ PARAM_ALS_VIS_ADC_MISC = 0x12
+ PARAM_ALS_VIS_ADC_MISC_VISRANGE = 0x10
+ # PARAM_ALS_VIS_ADC_MISC_VISRANGE = 0x00
+
+ PARAM_ALS_IR_ADC_COUNTER = 0x1D
+ PARAM_ALS_IR_ADC_GAIN = 0x1E
+ PARAM_ALS_IR_ADC_MISC = 0x1F
+ PARAM_ALS_IR_ADC_MISC_RANGE = 0x20
+ # PARAM_ALS_IR_ADC_MISC_RANGE = 0x00
+
+ PARAM_ADCCOUNTER_511CLK = 0x70
+
+ PARAM_ADCMUX_SMALLIR = 0x00
+ PARAM_ADCMUX_LARGEIR = 0x03
+
+ # REGISTERS
+ REG_PARTID = 0x00
+ REG_REVID = 0x01
+ REG_SEQID = 0x02
+
+ REG_INTCFG = 0x03
+ REG_INTCFG_INTOE = 0x01
+ REG_INTCFG_INTMODE = 0x02
+
+ REG_IRQEN = 0x04
+ REG_IRQEN_ALSEVERYSAMPLE = 0x01
+ REG_IRQEN_PS1EVERYSAMPLE = 0x04
+ REG_IRQEN_PS2EVERYSAMPLE = 0x08
+ REG_IRQEN_PS3EVERYSAMPLE = 0x10
+
+ REG_IRQMODE1 = 0x05
+ REG_IRQMODE2 = 0x06
+
+ REG_HWKEY = 0x07
+ REG_MEASRATE0 = 0x08
+ REG_MEASRATE1 = 0x09
+ REG_PSRATE = 0x0A
+ REG_PSLED21 = 0x0F
+ REG_PSLED3 = 0x10
+ REG_UCOEFF0 = 0x13
+ REG_UCOEFF1 = 0x14
+ REG_UCOEFF2 = 0x15
+ REG_UCOEFF3 = 0x16
+ REG_PARAMWR = 0x17
+ REG_COMMAND = 0x18
+ REG_RESPONSE = 0x20
+ REG_IRQSTAT = 0x21
+ REG_IRQSTAT_ALS = 0x01
+
+ REG_ALSVISDATA0 = 0x22
+ REG_ALSVISDATA1 = 0x23
+ REG_ALSIRDATA0 = 0x24
+ REG_ALSIRDATA1 = 0x25
+ REG_PS1DATA0 = 0x26
+ REG_PS1DATA1 = 0x27
+ REG_PS2DATA0 = 0x28
+ REG_PS2DATA1 = 0x29
+ REG_PS3DATA0 = 0x2A
+ REG_PS3DATA1 = 0x2B
+ REG_UVINDEX0 = 0x2C
+ REG_UVINDEX1 = 0x2D
+ REG_PARAMRD = 0x2E
+ REG_CHIPSTAT = 0x30
+
+ # I2C Address
+ ADDR = 0x60
+
+ DARK_OFFSET_VIS = 259
+ DARK_OFFSE_TIR = 253
+
+ def __init__(self):
+ """Constructor."""
+
+ self._i2c = I2C(address=SDL_Pi_SI1145.ADDR)
+
+ self._reset()
+ self._load_calibration()
+
+ # device reset
+ def _reset(self):
+ """Resets the device
+
+ :return:
+ """
+
+ with self._i2c.lock:
+ self._i2c.write8(register=SDL_Pi_SI1145.REG_MEASRATE0, value=0x00)
+ self._i2c.write8(register=SDL_Pi_SI1145.REG_MEASRATE1, value=0x00)
+ self._i2c.write8(register=SDL_Pi_SI1145.REG_IRQEN, value=0x00)
+ self._i2c.write8(register=SDL_Pi_SI1145.REG_IRQMODE1, value=0x00)
+ self._i2c.write8(register=SDL_Pi_SI1145.REG_IRQMODE2, value=0x00)
+ self._i2c.write8(register=SDL_Pi_SI1145.REG_INTCFG, value=0x00)
+ self._i2c.write8(register=SDL_Pi_SI1145.REG_IRQSTAT, value=0xFF)
+
+ self._i2c.write8(
+ register=SDL_Pi_SI1145.REG_COMMAND, value=SDL_Pi_SI1145.RESET
+ )
+ time.sleep(0.01)
+ self._i2c.write8(register=SDL_Pi_SI1145.REG_HWKEY, value=0x17)
+ time.sleep(0.01)
+
+ def write_param(self, parameter: int, value: int) -> int:
+ """Write Parameter to the Sensor."""
+
+ with self._i2c.lock:
+ self._i2c.write8(register=SDL_Pi_SI1145.REG_PARAMWR, value=value)
+ self._i2c.write8(
+ register=SDL_Pi_SI1145.REG_COMMAND,
+ value=parameter | SDL_Pi_SI1145.PARAM_SET,
+ )
+ param_val = self._i2c.read_uint8(register=SDL_Pi_SI1145.REG_PARAMRD)
+ return param_val
+
+ def read_param(self, parameter: int) -> int:
+ """Read Parameter from the Sensor."""
+
+ with self._i2c.lock:
+ self._i2c.write8(
+ register=SDL_Pi_SI1145.REG_COMMAND,
+ value=parameter | SDL_Pi_SI1145.PARAM_QUERY,
+ )
+ return self._i2c.read_uint8(register=SDL_Pi_SI1145.REG_PARAMRD)
+
+ # load calibration to sensor
+ def _load_calibration(self):
+ """Load calibration data."""
+
+ with self._i2c.lock:
+ # Enable UVindex measurement coefficients!
+ self._i2c.write8(register=SDL_Pi_SI1145.REG_UCOEFF0, value=0x29)
+ self._i2c.write8(register=SDL_Pi_SI1145.REG_UCOEFF1, value=0x89)
+ self._i2c.write8(register=SDL_Pi_SI1145.REG_UCOEFF2, value=0x02)
+ self._i2c.write8(register=SDL_Pi_SI1145.REG_UCOEFF3, value=0x00)
+
+ # Enable UV sensor
+ self.write_param(
+ parameter=SDL_Pi_SI1145.PARAM_CHLIST,
+ value=SDL_Pi_SI1145.PARAM_CHLIST_ENUV
+ | SDL_Pi_SI1145.PARAM_CHLIST_ENALSIR
+ | SDL_Pi_SI1145.PARAM_CHLIST_ENALSVIS
+ | SDL_Pi_SI1145.PARAM_CHLIST_ENPS1,
+ )
+
+ # Enable interrupt on every sample
+ self._i2c.write8(
+ register=SDL_Pi_SI1145.REG_INTCFG,
+ value=SDL_Pi_SI1145.REG_INTCFG_INTOE,
+ )
+ self._i2c.write8(
+ register=SDL_Pi_SI1145.REG_IRQEN,
+ value=SDL_Pi_SI1145.REG_IRQEN_ALSEVERYSAMPLE,
+ )
+
+ # /****************************** Prox Sense 1 */
+
+ # Program LED current
+ self._i2c.write8(
+ register=SDL_Pi_SI1145.REG_PSLED21, value=0x03
+ ) # 20mA for LED 1 only
+ self.write_param(
+ parameter=SDL_Pi_SI1145.PARAM_PS1ADCMUX,
+ value=SDL_Pi_SI1145.PARAM_ADCMUX_LARGEIR,
+ )
+
+ # Prox sensor #1 uses LED #1
+ self.write_param(
+ parameter=SDL_Pi_SI1145.PARAM_PSLED12SEL,
+ value=SDL_Pi_SI1145.PARAM_PSLED12SEL_PS1LED1,
+ )
+
+ # Fastest clocks, clock div 1
+ self.write_param(parameter=SDL_Pi_SI1145.PARAM_PSADCGAIN, value=0x00)
+
+ # Take 511 clocks to measure
+ self.write_param(
+ parameter=SDL_Pi_SI1145.PARAM_PSADCOUNTER,
+ value=SDL_Pi_SI1145.PARAM_ADCCOUNTER_511CLK,
+ )
+
+ # in prox mode, high range
+ self.write_param(
+ parameter=SDL_Pi_SI1145.PARAM_PSADCMISC,
+ value=SDL_Pi_SI1145.PARAM_PSADCMISC_RANGE
+ | SDL_Pi_SI1145.PARAM_PSADCMISC_PSMODE,
+ )
+ self.write_param(
+ parameter=SDL_Pi_SI1145.PARAM_ALSIRADCMUX,
+ value=SDL_Pi_SI1145.PARAM_ADCMUX_SMALLIR,
+ )
+
+ # Fastest clocks, clock div 1
+ self.write_param(
+ parameter=SDL_Pi_SI1145.PARAM_ALS_IR_ADC_GAIN,
+ value=0,
+ # value=4
+ )
+
+ # Take 511 clocks to measure
+ self.write_param(
+ parameter=SDL_Pi_SI1145.PARAM_ALS_IR_ADC_COUNTER,
+ value=SDL_Pi_SI1145.PARAM_ADCCOUNTER_511CLK,
+ )
+
+ # in high range mode
+ self.write_param(
+ parameter=SDL_Pi_SI1145.PARAM_ALS_IR_ADC_MISC,
+ value=0,
+ # value=SDL_Pi_SI1145.PARAM_ALS_IR_ADC_MISC_RANGE
+ )
+
+ # fastest clocks, clock div 1
+ self.write_param(
+ parameter=SDL_Pi_SI1145.PARAM_ALS_VIS_ADC_GAIN,
+ value=0,
+ # value=4
+ )
+
+ # Take 511 clocks to measure
+ self.write_param(
+ parameter=SDL_Pi_SI1145.PARAM_ALS_VIS_ADC_COUNTER,
+ value=SDL_Pi_SI1145.PARAM_ADCCOUNTER_511CLK,
+ )
+
+ # in high range mode (not normal signal)
+ self.write_param(
+ parameter=SDL_Pi_SI1145.PARAM_ALS_VIS_ADC_MISC,
+ value=0,
+ # value=SDL_Pi_SI1145.PARAM_ALS_VIS_ADC_MISC_VISRANGE
+ )
+
+ # measurement rate for auto
+ self._i2c.write8(
+ register=SDL_Pi_SI1145.REG_MEASRATE0, value=0xFF
+ ) # 255 * 31.25uS = 8ms
+
+ # auto run
+ self._i2c.write8(
+ register=SDL_Pi_SI1145.REG_COMMAND, value=SDL_Pi_SI1145.PSALS_AUTO
+ )
+
+ def read_visible(self) -> int:
+ """returns visible + IR light levels"""
+
+ with self._i2c.lock:
+ return self._i2c.read_uint16(register=SDL_Pi_SI1145.REG_ALSVISDATA0)
+
+ def read_visible_lux(self) -> float:
+ """returns visible + IR light levels in lux"""
+
+ return self.convert_visible_to_lux(self.read_visible())
+
+ def read_ir(self) -> int:
+ """returns IR light levels"""
+
+ with self._i2c.lock:
+ return self._i2c.read_uint16(register=SDL_Pi_SI1145.REG_ALSIRDATA0)
+
+ def read_ir_lux(self) -> float:
+ """returns IR light levels in lux"""
+
+ return self.convert_ir_to_lux(self.read_ir())
+
+ def read_prox(self) -> int:
+ """Returns "Proximity" - assumes an IR LED is attached to LED"""
+
+ with self._i2c.lock:
+ return self._i2c.read_uint16(register=SDL_Pi_SI1145.REG_PS1DATA0)
+
+ def read_uv(self) -> int:
+ """Returns the UV index * 100 (divide by 100 to get the index)"""
+
+ with self._i2c.lock:
+ # apply additional calibration of /10 based on sunlight
+ return self._i2c.read_uint16(register=SDL_Pi_SI1145.REG_UVINDEX0) / 10
+
+ def read_uv_index(self) -> float:
+ """Returns the UV Index."""
+
+ return self.read_uv() / 100
+
+ def convert_ir_to_lux(self, ir: int) -> float:
+ """Converts IR levels to lux."""
+
+ return self._convert_raw_to_lux(
+ raw=ir,
+ dark_offset=SDL_Pi_SI1145.DARK_OFFSE_TIR,
+ calibration_factor=50, # calibration factor to sunlight applied
+ param_adc_gain=SDL_Pi_SI1145.PARAM_ALS_IR_ADC_GAIN,
+ param_adc_misc=SDL_Pi_SI1145.PARAM_ALS_IR_ADC_MISC,
+ )
+
+ @staticmethod
+ def convert_uv_to_index(uv: int) -> float:
+ """Converts the read UV values to UV index."""
+
+ return uv / 100
+
+ def convert_visible_to_lux(self, vis: int) -> float:
+ """Converts the visible light level to lux."""
+
+ # Param 1: ALS_VIS_ADC_MISC
+ return self._convert_raw_to_lux(
+ raw=vis,
+ dark_offset=SDL_Pi_SI1145.DARK_OFFSET_VIS,
+ calibration_factor=100, # calibration to bright sunlight added
+ param_adc_gain=SDL_Pi_SI1145.PARAM_ALS_VIS_ADC_GAIN,
+ param_adc_misc=SDL_Pi_SI1145.PARAM_ALS_VIS_ADC_MISC,
+ )
+
+ def _convert_raw_to_lux(
+ self,
+ raw: int,
+ dark_offset: int,
+ calibration_factor: int,
+ param_adc_gain: int,
+ param_adc_misc: int,
+ ) -> float:
+ """Converts a raw input to Lux by applying a dark offset and a calibration
+ factor."""
+
+ raw = raw - dark_offset
+ if raw < 0:
+ raw = 0
+
+ lux = 2.44
+
+ # Get gain
+ gain = 1
+ # These are set to defaults in the Adafruit driver_module -
+ # need to change if you change them in the SI1145 driver_module
+ # range_ = self.read_param(parameter=adc_misc)
+ # if (range_ & 32) == 32:
+ # gain = 14.5
+
+ # Get sensitivity
+ multiplier = 1
+ # These are set to defaults in the Adafruit driver_module -
+ # need to change if you change them in the SI1145 driver_module
+ # sensitivity = self.read_param(parameter=adc_gain)
+ # if (sensitivity & 7) == 0:
+ # multiplier = 1
+ # if (sensitivity & 7) == 1:
+ # multiplier = 2
+ # if (sensitivity & 7) == 2:
+ # multiplier = 4
+ # if (sensitivity & 7) == 3:
+ # multiplier = 8
+ # if (sensitivity & 7) == 4:
+ # multiplier = 16
+ # if (sensitivity & 7) == 5:
+ # multiplier = 32
+ # if (sensitivity & 7) == 6:
+ # multiplier = 64
+ # if (sensitivity & 7) == 7:
+ # multiplier = 128
+
+ return raw * (gain / (lux * multiplier)) * calibration_factor
diff --git a/lib/py_edge_device/carlos/edge/device/protocol/__init__.py b/lib/py_edge_device/carlos/edge/device/protocol/__init__.py
new file mode 100644
index 00000000..8f3cbe0a
--- /dev/null
+++ b/lib/py_edge_device/carlos/edge/device/protocol/__init__.py
@@ -0,0 +1,4 @@
+__all__ = ["GPIO", "I2C", "I2cLock"]
+
+from .gpio import GPIO
+from .i2c import I2C, I2cLock
diff --git a/lib/py_edge_device/carlos/edge/device/protocol/_gpio_mock.py b/lib/py_edge_device/carlos/edge/device/protocol/_gpio_mock.py
new file mode 100644
index 00000000..0ae44470
--- /dev/null
+++ b/lib/py_edge_device/carlos/edge/device/protocol/_gpio_mock.py
@@ -0,0 +1,58 @@
+__all__ = ["GPIO"]
+
+from loguru import logger
+
+
+class GpioMock: # pragma: no cover
+
+ LOW = 0
+ HIGH = 1
+
+ IN = 0
+ OUT = 1
+
+ PUD_OFF = 0
+ PUD_DOWN = 1
+ PUD_UP = 2
+
+ BOARD = 10
+ BCM = 11
+
+ def __init__(self):
+ self._pins: list[int] = []
+
+ def setwarnings(self, state: bool):
+ pass
+
+ def setmode(self, mode: int):
+ pass
+
+ def setup(self, pin: int | list[int], mode: int):
+ if isinstance(pin, list):
+ for p in pin:
+ self.setup(p, mode)
+ return
+
+ if pin not in self._pins:
+ self._pins.append(pin)
+ logger.debug(f"Setup pin {pin} in mode {mode}")
+
+ def output(self, pin: int, state: bool):
+ if pin in self._pins:
+ logger.debug(f"Set pin {pin} to {'HIGH' if state else 'LOW'}")
+ else:
+ raise ValueError(f"Pin {pin} not set up")
+
+ def input(self, pin: int) -> bool:
+ if pin in self._pins:
+ logger.debug(f"Reading input from pin {pin}")
+ return False # Dummy value, always returning False for simplicity
+ else:
+ raise ValueError(f"Pin {pin} not set up")
+
+ def cleanup(self):
+ self._pins.clear()
+ logger.debug("Cleaned up GPIO pins")
+
+
+GPIO = GpioMock()
diff --git a/lib/py_edge_device/carlos/edge/device/protocol/gpio.py b/lib/py_edge_device/carlos/edge/device/protocol/gpio.py
new file mode 100644
index 00000000..39c612dd
--- /dev/null
+++ b/lib/py_edge_device/carlos/edge/device/protocol/gpio.py
@@ -0,0 +1,17 @@
+__all__ = ["GPIO"]
+
+import traceback
+import warnings
+
+try:
+ from RPi import GPIO # type: ignore
+except ImportError:
+ warnings.warn(
+ "RPi.GPIO not available. Fallback to mocked GPIO instead. "
+ f"{traceback.format_exc()}"
+ )
+ from ._gpio_mock import GPIO # type: ignore
+
+# Choose the GPIO mode globally
+GPIO.setmode(GPIO.BCM)
+GPIO.setwarnings(False)
diff --git a/lib/py_edge_device/carlos/edge/device/protocol/i2c.py b/lib/py_edge_device/carlos/edge/device/protocol/i2c.py
new file mode 100644
index 00000000..bfa24dd4
--- /dev/null
+++ b/lib/py_edge_device/carlos/edge/device/protocol/i2c.py
@@ -0,0 +1,158 @@
+__all__ = ["I2cLock", "I2C"]
+
+import re
+from threading import RLock
+from typing import Sequence
+
+import smbus2
+
+I2C_LOCK = RLock()
+
+
+class I2cLock: # pragma: no cover
+ def __new__(cls):
+ return I2C_LOCK
+
+
+class I2C: # pragma: no cover
+ """This class is based on, but heavily modified from, the Adafruit_I2C class"""
+
+ def __init__(self, address: int, bus: int | None = None):
+ """Creates an instance of the I2C class.
+
+ :param address: The I2C address of the device.
+ :param bus: The I2C bus number. If None, the bus number is auto-detected.
+ """
+
+ self.address = address
+ # By default, the correct I2C bus is auto-detected using /proc/cpuinfo
+ # Alternatively, you can hard-code the bus version below:
+ # self.bus = smbus2.SMBus(0); # Force I2C0 (early 256MB Pi's)
+ # self.bus = smbus2.SMBus(1); # Force I2C1 (512MB Pi's)
+ self.bus = smbus2.SMBus(
+ bus=bus if bus is not None else I2C.get_pi_i2v_bus_number()
+ )
+
+ @property
+ def lock(self) -> RLock:
+ """Returns the i2c lock"""
+ return I2C_LOCK
+
+ def write8(self, register: int, value: int):
+ """Writes an 8-bit value to the specified register/address"""
+ try:
+ self.bus.write_byte_data(
+ i2c_addr=self.address, register=register, value=value
+ )
+ except IOError:
+ raise IOError(
+ f"Error accessing 0x{self.address:0x}: Check your I2C address."
+ )
+
+ def write16(self, register: int, value: int):
+ """Writes a 16-bit value to the specified register/address pair"""
+ try:
+ self.bus.write_word_data(
+ i2c_addr=self.address, register=register, value=value
+ )
+ except IOError:
+ raise IOError(
+ f"Error accessing 0x{self.address:0x}: Check your I2C address."
+ )
+
+ def write_raw8(self, value: int):
+ """Writes an 8-bit value on the bus"""
+ try:
+ self.bus.write_byte(i2c_addr=self.address, value=value)
+ except IOError:
+ raise IOError(
+ f"Error accessing 0x{self.address:0x}: Check your I2C address."
+ )
+
+ def write_list(self, register: int, data: Sequence[int]):
+ """Writes an array of bytes using I2C format"""
+ try:
+ self.bus.write_i2c_block_data(
+ i2c_addr=self.address, register=register, data=data
+ )
+ except IOError:
+ raise IOError(
+ f"Error accessing 0x{self.address:0x}: Check your I2C address."
+ )
+
+ def read_list(self, register: int, length: int):
+ """Read a list of bytes from the I2C device"""
+ try:
+ return self.bus.read_i2c_block_data(
+ i2c_addr=self.address, register=register, length=length
+ )
+ except IOError:
+ raise IOError(
+ f"Error accessing 0x{self.address:0x}: Check your I2C address."
+ )
+
+ def read_uint8(self, register: int) -> int:
+ """Read an unsigned byte from the I2C device"""
+ try:
+ return self.bus.read_byte_data(i2c_addr=self.address, register=register)
+ except IOError:
+ raise IOError(
+ f"Error accessing 0x{self.address:0x}: Check your I2C address."
+ )
+
+ def read_int8(self, register: int):
+ """Reads a signed byte from the I2C device"""
+ result = self.read_uint8(register=register)
+ if result > 127:
+ result -= 256
+ return result
+
+ def read_uint16(self, register: int, little_endian: bool = True):
+ """Reads an unsigned 16-bit value from the I2C device"""
+ try:
+ result = self.bus.read_word_data(i2c_addr=self.address, register=register)
+ # Swap bytes if using big endian because read_word_data assumes little
+ # endian on ARM (little endian) systems.
+ if not little_endian:
+ result = ((result << 8) & 0xFF00) + (result >> 8)
+ return result
+ except IOError:
+ raise IOError(
+ f"Error accessing 0x{self.address:0x}: Check your I2C address."
+ )
+
+ def read_int16(self, register: int, little_endian=True):
+ """Reads a signed 16-bit value from the I2C device"""
+ result = self.read_uint16(register=register, little_endian=little_endian)
+ if result > 32767:
+ result -= 65536
+ return result
+
+ @staticmethod
+ def get_pi_revision() -> int:
+ """Gets the version number of the Raspberry Pi board"""
+ # Revision list available at:
+ # http://elinux.org/RPi_HardwareHistory#Board_Revision_History
+ try:
+ with open("/proc/cpuinfo", "r") as infile:
+ for line in infile:
+ # Match a line of the form "Revision : 0002" while ignoring extra
+ # info in front of the revsion
+ # (like 1000 when the Pi was over-volted).
+ match = re.match("Revision\s+:\s+.*(\w{4})$", line)
+ if match and match.group(1) in ["0000", "0002", "0003"]:
+ # Return revision 1 if revision ends with 0000, 0002 or 0003.
+ return 1
+ elif match:
+ # Assume revision 2 if revision ends with any other 4 chars.
+ return 2
+ # Couldn't find the revision, assume revision 0 like older code for
+ # compatibility.
+ return 0
+ except Exception:
+ return 0
+
+ @staticmethod
+ def get_pi_i2v_bus_number() -> int:
+ # Gets the I2C bus number /dev/i2c#
+ return 1 if I2C.get_pi_revision() > 1 else 0
diff --git a/lib/py_edge_device/carlos/edge/device/runtime.py b/lib/py_edge_device/carlos/edge/device/runtime.py
index 14949287..39ce1972 100644
--- a/lib/py_edge_device/carlos/edge/device/runtime.py
+++ b/lib/py_edge_device/carlos/edge/device/runtime.py
@@ -3,41 +3,41 @@
from datetime import timedelta
from pathlib import Path
+from typing import Self
from apscheduler import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger
-from carlos.edge.interface import EdgeConnectionDisconnected, EdgeProtocol
+from carlos.edge.interface import DeviceId, EdgeConnectionDisconnected, EdgeProtocol
+from carlos.edge.interface.device.driver import validate_device_address_space
from carlos.edge.interface.protocol import PING
from loguru import logger
from .communication import DeviceCommunicationHandler
-from .config import DeviceConfig
+from .config import load_drivers
# We don't cover this in the unit tests. This needs to be tested in an integration test.
class DeviceRuntime: # pragma: no cover
- def __init__(self, config: DeviceConfig, protocol: EdgeProtocol):
+ def __init__(self, device_id: DeviceId, protocol: EdgeProtocol):
"""Initializes the device runtime.
- :param config: The configuration of the device.
+ :param device_id: The unique identifier of the device.
+ :param protocol: The concrete implementation of the EdgeProtocol.
"""
- self.config = config
+ self.device_id = device_id
self.protocol = protocol
+ self.driver_manager = DriverManager()
+
async def run(self):
"""Runs the device runtime."""
- logger.add(
- sink=Path.cwd() / ".carlos_data" / "device" / "device_log_{time}.log",
- level="INFO",
- rotation="50 MB",
- retention=timedelta(days=60),
- )
+ self._prepare_runtime()
communication_handler = DeviceCommunicationHandler(
- protocol=self.protocol, device_id=self.config.device_id
+ protocol=self.protocol, device_id=self.device_id
)
if not self.protocol.is_connected:
@@ -49,6 +49,7 @@ async def run(self):
kwargs={"communication_handler": communication_handler},
trigger=IntervalTrigger(minutes=1),
)
+ self.io_manager.register_tasks(scheduler=scheduler)
await scheduler.start_in_background()
while True:
@@ -60,6 +61,36 @@ async def run(self):
except EdgeConnectionDisconnected:
await self.protocol.connect()
+ def _prepare_runtime(self):
+
+ logger.add(
+ sink=Path.cwd() / ".carlos_data" / "device" / "device_log_{time}.log",
+ level="INFO",
+ rotation="50 MB",
+ retention=timedelta(days=60),
+ )
+
+
+class DriverManager: # pragma: no cover
+
+ def __init__(self):
+
+ self.drivers = load_drivers()
+ validate_device_address_space(self.drivers)
+
+ def setup(self) -> Self:
+ """Sets up the I/O peripherals."""
+ for driver in self.drivers:
+ logger.debug(f"Setting up driver {driver}.")
+ driver.setup()
+
+ return self
+
+ def register_tasks(self, scheduler: AsyncScheduler) -> Self:
+ """Registers the tasks of the I/O peripherals."""
+
+ return self
+
async def send_ping(
communication_handler: DeviceCommunicationHandler,
diff --git a/lib/py_edge_device/poetry.lock b/lib/py_edge_device/poetry.lock
index ebf98132..569195c7 100644
--- a/lib/py_edge_device/poetry.lock
+++ b/lib/py_edge_device/poetry.lock
@@ -148,7 +148,7 @@ test = ["coverage", "freezegun", "pre-commit", "pytest", "pytest-cov", "pytest-m
[[package]]
name = "carlos-edge-interface"
-version = "0.1.0"
+version = "0.1.1"
description = "Shared library to handle the edge communication."
optional = false
python-versions = ">=3.11,<3.12"
@@ -712,6 +712,34 @@ files = [
[package.dependencies]
wcwidth = "*"
+[[package]]
+name = "psutil"
+version = "5.9.8"
+description = "Cross-platform lib for process and system monitoring in Python."
+optional = false
+python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*"
+files = [
+ {file = "psutil-5.9.8-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:26bd09967ae00920df88e0352a91cff1a78f8d69b3ecabbfe733610c0af486c8"},
+ {file = "psutil-5.9.8-cp27-cp27m-manylinux2010_i686.whl", hash = "sha256:05806de88103b25903dff19bb6692bd2e714ccf9e668d050d144012055cbca73"},
+ {file = "psutil-5.9.8-cp27-cp27m-manylinux2010_x86_64.whl", hash = "sha256:611052c4bc70432ec770d5d54f64206aa7203a101ec273a0cd82418c86503bb7"},
+ {file = "psutil-5.9.8-cp27-cp27mu-manylinux2010_i686.whl", hash = "sha256:50187900d73c1381ba1454cf40308c2bf6f34268518b3f36a9b663ca87e65e36"},
+ {file = "psutil-5.9.8-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:02615ed8c5ea222323408ceba16c60e99c3f91639b07da6373fb7e6539abc56d"},
+ {file = "psutil-5.9.8-cp27-none-win32.whl", hash = "sha256:36f435891adb138ed3c9e58c6af3e2e6ca9ac2f365efe1f9cfef2794e6c93b4e"},
+ {file = "psutil-5.9.8-cp27-none-win_amd64.whl", hash = "sha256:bd1184ceb3f87651a67b2708d4c3338e9b10c5df903f2e3776b62303b26cb631"},
+ {file = "psutil-5.9.8-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:aee678c8720623dc456fa20659af736241f575d79429a0e5e9cf88ae0605cc81"},
+ {file = "psutil-5.9.8-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8cb6403ce6d8e047495a701dc7c5bd788add903f8986d523e3e20b98b733e421"},
+ {file = "psutil-5.9.8-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d06016f7f8625a1825ba3732081d77c94589dca78b7a3fc072194851e88461a4"},
+ {file = "psutil-5.9.8-cp36-cp36m-win32.whl", hash = "sha256:7d79560ad97af658a0f6adfef8b834b53f64746d45b403f225b85c5c2c140eee"},
+ {file = "psutil-5.9.8-cp36-cp36m-win_amd64.whl", hash = "sha256:27cc40c3493bb10de1be4b3f07cae4c010ce715290a5be22b98493509c6299e2"},
+ {file = "psutil-5.9.8-cp37-abi3-win32.whl", hash = "sha256:bc56c2a1b0d15aa3eaa5a60c9f3f8e3e565303b465dbf57a1b730e7a2b9844e0"},
+ {file = "psutil-5.9.8-cp37-abi3-win_amd64.whl", hash = "sha256:8db4c1b57507eef143a15a6884ca10f7c73876cdf5d51e713151c1236a0e68cf"},
+ {file = "psutil-5.9.8-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:d16bbddf0693323b8c6123dd804100241da461e41d6e332fb0ba6058f630f8c8"},
+ {file = "psutil-5.9.8.tar.gz", hash = "sha256:6be126e3225486dff286a8fb9a06246a5253f4c7c53b475ea5f5ac934e64194c"},
+]
+
+[package.extras]
+test = ["enum34", "ipaddress", "mock", "pywin32", "wmi"]
+
[[package]]
name = "pydantic"
version = "2.7.0"
@@ -1081,6 +1109,21 @@ typing-extensions = "*"
[package.extras]
dev = ["flake8", "flake8-docstrings", "mypy", "packaging", "pre-commit", "pytest", "pytest-cov", "types-setuptools"]
+[[package]]
+name = "rpi-gpio"
+version = "0.7.1"
+description = "A module to control Raspberry Pi GPIO channels"
+optional = false
+python-versions = "*"
+files = [
+ {file = "RPi.GPIO-0.7.1-cp27-cp27mu-linux_armv6l.whl", hash = "sha256:b86b66dc02faa5461b443a1e1f0c1d209d64ab5229696f32fb3b0215e0600c8c"},
+ {file = "RPi.GPIO-0.7.1-cp310-cp310-linux_armv6l.whl", hash = "sha256:57b6c044ef5375a78c8dda27cdfadf329e76aa6943cd6cffbbbd345a9adf9ca5"},
+ {file = "RPi.GPIO-0.7.1-cp37-cp37m-linux_armv6l.whl", hash = "sha256:77afb817b81331ce3049a4b8f94a85e41b7c404d8e56b61ac0f1eb75c3120868"},
+ {file = "RPi.GPIO-0.7.1-cp38-cp38-linux_armv6l.whl", hash = "sha256:29226823da8b5ccb9001d795a944f2e00924eeae583490f0bc7317581172c624"},
+ {file = "RPi.GPIO-0.7.1-cp39-cp39-linux_armv6l.whl", hash = "sha256:15311d3b063b71dee738cd26570effc9985a952454d162937c34e08c0fc99902"},
+ {file = "RPi.GPIO-0.7.1.tar.gz", hash = "sha256:cd61c4b03c37b62bba4a5acfea9862749c33c618e0295e7e90aa4713fb373b70"},
+]
+
[[package]]
name = "ruff"
version = "0.3.7"
@@ -1118,6 +1161,22 @@ files = [
{file = "semver-3.0.2.tar.gz", hash = "sha256:6253adb39c70f6e51afed2fa7152bcd414c411286088fb4b9effb133885ab4cc"},
]
+[[package]]
+name = "smbus2"
+version = "0.4.3"
+description = "smbus2 is a drop-in replacement for smbus-cffi/smbus-python in pure Python"
+optional = false
+python-versions = "*"
+files = [
+ {file = "smbus2-0.4.3-py2.py3-none-any.whl", hash = "sha256:a2fc29cfda4081ead2ed61ef2c4fc041d71dd40a8d917e85216f44786fca2d1d"},
+ {file = "smbus2-0.4.3.tar.gz", hash = "sha256:36f2288a8e1a363cb7a7b2244ec98d880eb5a728a2494ac9c71e9de7bf6a803a"},
+]
+
+[package.extras]
+docs = ["sphinx (>=1.5.3)"]
+qa = ["flake8"]
+test = ["mock", "nose"]
+
[[package]]
name = "sniffio"
version = "1.3.1"
@@ -1242,6 +1301,17 @@ files = [
{file = "tomlkit-0.12.4.tar.gz", hash = "sha256:7ca1cfc12232806517a8515047ba66a19369e71edf2439d0f5824f91032b6cc3"},
]
+[[package]]
+name = "types-psutil"
+version = "5.9.5.20240316"
+description = "Typing stubs for psutil"
+optional = false
+python-versions = ">=3.8"
+files = [
+ {file = "types-psutil-5.9.5.20240316.tar.gz", hash = "sha256:5636f5714bb930c64bb34c4d47a59dc92f9d610b778b5364a31daa5584944848"},
+ {file = "types_psutil-5.9.5.20240316-py3-none-any.whl", hash = "sha256:2fdd64ea6e97befa546938f486732624f9255fde198b55e6f00fda236f059f64"},
+]
+
[[package]]
name = "types-pyyaml"
version = "6.0.12.20240311"
@@ -1353,4 +1423,4 @@ dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.11,<3.12"
-content-hash = "1567e5e5cfd6b309a9ad9da8f8c1d9130717d3a8683d4c39b7f380ec62a82f83"
+content-hash = "1360c9bb3a5033c9673e55b9ddf9db48210ddf6dda4e73057645c9599f01f322"
diff --git a/lib/py_edge_device/pyproject.toml b/lib/py_edge_device/pyproject.toml
index f4248b4a..fa10b7ee 100644
--- a/lib/py_edge_device/pyproject.toml
+++ b/lib/py_edge_device/pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "carlos.edge.device"
-version = "0.1.1"
+version = "0.1.2"
description = "The library for the edge device of the carlos project."
authors = ["Felix Fanghanel"]
license = "MIT"
@@ -16,10 +16,14 @@ apscheduler = "^4.0.0a4"
pydantic = "^2.6.4"
pyyaml = "^6.0.1"
"carlos.edge.interface" = {path = "../py_edge_interface"}
+psutil = "^5.9.8"
+rpi-gpio = {version = "^0.7.1", markers = "platform_machine == 'armv7l' or platform_machine == 'aarch64'"}
+smbus2 = "^0.4.3"
[tool.poetry.group.dev.dependencies]
"devtools" = {path = "../py_dev_dependencies"}
types-pyyaml = "^6.0.12.20240311"
+types-psutil = "^5.9.5.20240316"
[build-system]
@@ -27,7 +31,7 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.bumpversion]
-current_version = "0.1.1"
+current_version = "0.1.2"
commit = true
tag = false
parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)(\\-(?P[a-z0-9\\.]+))?"
@@ -76,4 +80,6 @@ exclude_lines = [
omit = [
# omit all tests
"*_test.py",
+ # depends on actual device and hardware
+ "carlos/edge/device/driver/*.py"
]
diff --git a/lib/py_edge_device/tests/__init__.py b/lib/py_edge_device/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/lib/py_edge_device/tests/test_data/__init__.py b/lib/py_edge_device/tests/test_data/__init__.py
new file mode 100644
index 00000000..999b0f15
--- /dev/null
+++ b/lib/py_edge_device/tests/test_data/__init__.py
@@ -0,0 +1,6 @@
+from pathlib import Path
+
+TEST_DEVICE_WORKDIR = Path(__file__).parent
+
+EXPECTED_IO_COUNT = 11
+"""10 configured I/Os + 1 default I/O."""
diff --git a/lib/py_edge_device/tests/test_data/device_config b/lib/py_edge_device/tests/test_data/device_config
new file mode 100644
index 00000000..f0c34133
--- /dev/null
+++ b/lib/py_edge_device/tests/test_data/device_config
@@ -0,0 +1,30 @@
+drivers:
+ - identifier: temp-humi-intern
+ driver_module: dht11
+ pin: 4
+ - identifier: uv-light
+ driver_module: si1145
+ - identifier: pump
+ driver_module: relay
+ pin: 20
+ - identifier: valve-1
+ driver_module: relay
+ pin: 21
+ - identifier: valve-2
+ driver_module: relay
+ pin: 22
+ - identifier: valve-3
+ driver_module: relay
+ pin: 23
+ - identifier: valve-4
+ driver_module: carlos.edge.device.driver.relay
+ pin: 24
+ - identifier: relay-6
+ driver_module: carlos.edge.device.driver.relay
+ pin: 25
+ - identifier: relay-7
+ driver_module: carlos.edge.device.driver.relay
+ pin: 26
+ - identifier: relay-8
+ driver_module: carlos.edge.device.driver.relay
+ pin: 27
diff --git a/lib/py_edge_interface/carlos/edge/interface/device/__init__.py b/lib/py_edge_interface/carlos/edge/interface/device/__init__.py
new file mode 100644
index 00000000..6ca4894c
--- /dev/null
+++ b/lib/py_edge_interface/carlos/edge/interface/device/__init__.py
@@ -0,0 +1,12 @@
+__all__ = [
+ "AnalogInput",
+ "CarlosDriver",
+ "DigitalOutput",
+ "DriverConfig",
+ "DriverFactory",
+ "GpioDriverConfig",
+ "I2cDriverConfig",
+]
+
+from .driver import AnalogInput, CarlosDriver, DigitalOutput, DriverFactory
+from .driver_config import DriverConfig, GpioDriverConfig, I2cDriverConfig
diff --git a/lib/py_edge_interface/carlos/edge/interface/device/driver.py b/lib/py_edge_interface/carlos/edge/interface/device/driver.py
new file mode 100644
index 00000000..6ac759fa
--- /dev/null
+++ b/lib/py_edge_interface/carlos/edge/interface/device/driver.py
@@ -0,0 +1,247 @@
+__all__ = [
+ "AnalogInput",
+ "DigitalOutput",
+ "CarlosDriver",
+ "DriverFactory",
+ "validate_device_address_space",
+]
+import asyncio
+import concurrent.futures
+from abc import ABC, abstractmethod
+from collections import namedtuple
+from time import sleep
+from typing import Any, Callable, Generic, Iterable, Self, TypeVar
+
+from .driver_config import (
+ DirectionMixin,
+ DriverConfig,
+ GpioDriverConfig,
+ I2cDriverConfig,
+)
+
+DriverConfigTypeVar = TypeVar("DriverConfigTypeVar", bound=DriverConfig)
+
+
+class CarlosDriverBase(ABC, Generic[DriverConfigTypeVar]):
+ """Common base class for all drivers."""
+
+ def __init__(self, config: DriverConfigTypeVar):
+ self.config: DriverConfigTypeVar = config
+
+ def __str__(self):
+ return f"{self.config.identifier} ({self.config.driver_module})"
+
+ @property
+ def identifier(self):
+ return self.config.identifier
+
+ @abstractmethod
+ def setup(self) -> Self:
+ """Sets up the peripheral. This is required for testing. As the test runner
+ is not able to run the setup method of the peripheral outside the device."""
+ pass
+
+ @abstractmethod
+ def test(self):
+ """Tests the peripheral. This is used to validate a config by a human."""
+ pass
+
+
+class AnalogInput(CarlosDriverBase, ABC):
+ """Common base class for all analog input peripherals."""
+
+ def __init__(self, config: DriverConfigTypeVar):
+
+ if isinstance(config, DirectionMixin):
+ if config.direction != "input":
+ raise ValueError(
+ "Recieved a non-input configuration for an analog input."
+ )
+
+ super().__init__(config)
+
+ @abstractmethod
+ def read(self) -> dict[str, float]:
+ """Reads the value of the analog input. The return value is a dictionary
+ containing the value of the analog input."""
+ pass
+
+ def test(self):
+ """Tests the analog input by reading the value."""
+
+ return self.read()
+
+ async def read_async(self) -> dict[str, float]:
+ """Reads the value of the analog input asynchronously. The return value is a
+ dictionary containing the value of the analog input."""
+
+ loop = asyncio.get_running_loop()
+
+ with concurrent.futures.ThreadPoolExecutor() as pool:
+ return await loop.run_in_executor(executor=pool, func=self.read)
+
+
+class DigitalOutput(CarlosDriverBase, ABC):
+ """Common base class for all digital output peripherals."""
+
+ def __init__(self, config: DriverConfigTypeVar):
+
+ if isinstance(config, DirectionMixin):
+ if config.direction != "output":
+ raise ValueError(
+ "Recieved a non-output configuration for a digital output."
+ )
+
+ super().__init__(config)
+
+ @abstractmethod
+ def set(self, value: bool):
+ pass
+
+ def test(self):
+ """Tests the digital output by setting the value to False, then True for 1 second,
+ and then back to False."""
+
+ self.set(False)
+ self.set(True)
+ sleep(1)
+ self.set(False)
+
+
+CarlosDriver = AnalogInput | DigitalOutput
+
+DriverDefinition = namedtuple("DriverDefinition", ["config", "factory"])
+
+
+class DriverFactory:
+ """A singleton factory for io peripherals."""
+
+ _instance = None
+ _driver_index: dict[str, DriverDefinition] = {}
+
+ def __new__(cls):
+ if cls._instance is None:
+ cls._instance = super(DriverFactory, cls).__new__(cls)
+ cls._instance._driver_index = {}
+
+ return cls._instance
+
+ def register(
+ self,
+ driver_module: str,
+ config: type[DriverConfigTypeVar],
+ factory: Callable[[DriverConfigTypeVar], CarlosDriver],
+ ):
+ """Registers a peripheral with the peripheral registry.
+
+ :param driver_module: The peripheral type.
+ :param config: The peripheral configuration model.
+ :param factory: The peripheral factory function.
+ :raises ValueError: If the config is not a subclass of DriverConfig.
+ :raises RuntimeError: If the peripheral is already registered.
+ """
+
+ if not issubclass(config, DriverConfig):
+ raise ValueError(
+ "The config must be a subclass of DriverConfig. "
+ "Please ensure that the config class is a subclass of DriverConfig."
+ )
+
+ if driver_module in self._driver_index:
+ raise RuntimeError(f"The peripheral {driver_module} is already registered.")
+
+ self._driver_index[driver_module] = DriverDefinition(config, factory)
+
+ def build(self, config: dict[str, Any]) -> CarlosDriver:
+ """Builds a IO object from its configuration.
+
+ :param config: The raw configuration. The schema must adhere to the
+ DriverConfig model. But we require the full config as the ios may require
+ additional parameters.
+ :returns: The IO object.
+ :raises RuntimeError: If the driver is not registered.
+ """
+
+ io_config = DriverConfig.model_validate(config)
+
+ if io_config.driver_module not in self._driver_index:
+ raise RuntimeError(
+ f"The driver {io_config.driver_module} is not registered."
+ f"Make sure to register `DriverFactory().register(...)` "
+ f"the peripheral before building it."
+ )
+
+ driver_definition = self._driver_index[io_config.driver_module]
+
+ return driver_definition.factory(
+ driver_definition.config.model_validate(config)
+ )
+
+
+I2C_PINS = [2, 3]
+"""The Pin numbers designated for I2C communication."""
+
+
+def validate_device_address_space(drivers: Iterable[CarlosDriver]):
+ """This function ensures that the configured pins and addresses are unique.
+
+ :param drivers: The list of IOs to validate.
+ :raises ValueError: If any of the pins or addresses are configured more than once.
+ If the GPIO pins 2 and 3 are when I2C communication is configured.
+ If any of the identifiers are configured more than once.
+ """
+
+ # Ensure all identifiers are unique
+ seen_identifiers = set()
+ duplicate_identifiers = [
+ driver.identifier
+ for driver in drivers
+ if driver.identifier in seen_identifiers or seen_identifiers.add(driver.identifier) # type: ignore[func-returns-value] # noqa: E501
+ ]
+ if duplicate_identifiers:
+ raise ValueError(
+ f"The identifiers {duplicate_identifiers} are configured more than "
+ f"once. Please ensure that each identifier is configured only once."
+ )
+
+ configs = [io.config for io in drivers]
+
+ gpio_configs: list[GpioDriverConfig] = [
+ io for io in configs if isinstance(io, GpioDriverConfig)
+ ]
+
+ # Ensure GPIO pins are unique
+ seen_pins = set()
+ duplicate_gpio_pins = [
+ gpio.pin
+ for gpio in gpio_configs
+ if gpio.pin in seen_pins or seen_pins.add(gpio.pin) # type: ignore[func-returns-value] # noqa: E501
+ ]
+ if duplicate_gpio_pins:
+ raise ValueError(
+ f"The GPIO pins {duplicate_gpio_pins} are configured more than once."
+ f"Please ensure that each GPIO pin is configured only once."
+ )
+
+ i2c_configs: list[I2cDriverConfig] = [
+ io for io in configs if isinstance(io, I2cDriverConfig)
+ ]
+ if i2c_configs:
+ if any(gpio.pin in I2C_PINS for gpio in gpio_configs):
+ raise ValueError(
+ "The GPIO pins 2 and 3 are reserved for I2C communication."
+ "Please use other pins for GPIO configuration."
+ )
+
+ # Ensure I2C addresses are unique
+ seen_addresses = set()
+ duplicate_i2c_addresses = [
+ i2c.address
+ for i2c in i2c_configs
+ if i2c.address in seen_addresses or seen_addresses.add(i2c.address) # type: ignore[func-returns-value] # noqa: E501
+ ]
+ if duplicate_i2c_addresses:
+ raise ValueError(
+ f"The I2C addresses {duplicate_i2c_addresses} are configured more than "
+ f"once. Please ensure that each I2C address is configured only once."
+ )
diff --git a/lib/py_edge_interface/carlos/edge/interface/device/driver_config.py b/lib/py_edge_interface/carlos/edge/interface/device/driver_config.py
new file mode 100644
index 00000000..3cf5661e
--- /dev/null
+++ b/lib/py_edge_interface/carlos/edge/interface/device/driver_config.py
@@ -0,0 +1,131 @@
+__all__ = [
+ "DirectionMixin",
+ "DriverConfig",
+ "GpioDriverConfig",
+ "I2cDriverConfig",
+]
+
+import importlib
+from typing import Literal
+
+from pydantic import BaseModel, Field, field_validator
+
+# Pin layout
+# 5V, 5V, GND, 14, 15, 18, GND, 23, 24, GND, 25, 8, 7, ID EEPROM, GND, 12, GND, 16, 20, 21 # Outer pins # noqa
+# 3.3V, 2, 3, 4, GND, 17, 27, 22, 3.3V, 10, 9, 11, GND, ID EEPROM, 5, 6, 13, 19, 26, GND # Inner pins # noqa
+
+
+class DriverConfig(BaseModel):
+ """Common base class for all driver_module configurations."""
+
+ identifier: str = Field(
+ ...,
+ description="A unique identifier for the driver_module configuration. "
+ "It is used to allow changing addresses, pins if required later.",
+ )
+
+ driver_module: str = Field(
+ ...,
+ description="Refers to the module name that implements the IO driver_module. "
+ "Built-in drivers located in carlos.edge.device.driver module "
+ "don't need to specify the full path. Each driver_module module"
+ "must make a call to the DriverFactory.register method to register"
+ "itself.",
+ )
+
+ @field_validator("driver_module", mode="after")
+ def _validate_driver_module(cls, value):
+ """Converts a module name to a full module path."""
+
+ # check if the given module exists in the current working directory.
+ try:
+ importlib.import_module(value)
+ except ModuleNotFoundError:
+ abs_module = "carlos.edge.device.driver" + "." + value
+ try:
+ importlib.import_module(abs_module)
+ except ModuleNotFoundError:
+ raise ValueError(f"The module {value} ({abs_module}) does not exist.")
+ value = abs_module # pragma: no cover
+
+ return value
+
+
+class DirectionMixin(BaseModel):
+ direction: Literal["input", "output"] = Field(
+ ..., description="The direction of the IO."
+ )
+
+
+class GpioDriverConfig(DriverConfig, DirectionMixin):
+ """Defines a single input configuration."""
+
+ protocol: Literal["gpio"] = Field(
+ "gpio",
+ description="The communication protocol to be used for the IO.",
+ )
+
+ pin: Literal[
+ 2,
+ 3,
+ 4,
+ 5,
+ 6,
+ 7,
+ 8,
+ 9,
+ 10,
+ 11,
+ 12,
+ 13,
+ 14,
+ 15,
+ 16,
+ 17,
+ 18,
+ 19,
+ 20,
+ 21,
+ 22,
+ 23,
+ 24,
+ 25,
+ 26,
+ 27,
+ ] = Field(..., description="The GPIO pin number.")
+
+
+class I2cDriverConfig(DriverConfig, DirectionMixin):
+ """Defines a single input configuration."""
+
+ protocol: Literal["i2c"] = Field(
+ "i2c",
+ description="The communication protocol to be used for the IO.",
+ )
+
+ address: str = Field(..., description="The I2C address of the device.")
+
+ @field_validator("address", mode="before")
+ def validate_address(cls, value):
+ """Validate the I2C address."""
+
+ if isinstance(value, str):
+ if value.startswith("0x"):
+ value = value[2:]
+
+ try:
+ value = int(value, 16)
+ except ValueError:
+ raise ValueError("The I2C address must be a valid hexadecimal value.")
+
+ # first 2 are reserved
+ # address length is 7 bits, but the majority of literature defines 0x77 as max
+ if not 0x03 <= int(value) <= 0x77:
+ raise ValueError("The valid I2C address range is 0x03 to 0x77.")
+
+ return f"0x{value:02x}"
+
+ @property
+ def address_int(self):
+ """Returns the I2C address as an integer."""
+ return int(self.address, 16)
diff --git a/lib/py_edge_interface/carlos/edge/interface/device/driver_config_test.py b/lib/py_edge_interface/carlos/edge/interface/device/driver_config_test.py
new file mode 100644
index 00000000..64d25aea
--- /dev/null
+++ b/lib/py_edge_interface/carlos/edge/interface/device/driver_config_test.py
@@ -0,0 +1,77 @@
+from contextlib import nullcontext
+
+import pytest
+from pydantic import ValidationError
+
+from .driver_config import DriverConfig, I2cDriverConfig
+
+VALID_DRIVER_MODULE = __name__
+"""For a driver module to be valid, it must be importable. Everything else
+is checked else where."""
+
+
+class TestDriverConfig:
+
+ @pytest.mark.parametrize(
+ "driver_module, expected",
+ [
+ pytest.param(VALID_DRIVER_MODULE, VALID_DRIVER_MODULE, id="valid module"),
+ pytest.param("non_existing_module", ValueError, id="invalid module"),
+ ],
+ )
+ def test_driver_module_validation(
+ self, driver_module: str, expected: str | type[Exception]
+ ):
+ """This function ensures that the driver module is valid."""
+
+ if isinstance(expected, str):
+ context = nullcontext()
+ else:
+ context = pytest.raises(expected)
+
+ with context:
+ config = DriverConfig(
+ identifier="does-not-matter", driver_module=driver_module
+ )
+
+ assert config.driver_module == expected
+
+
+class TestI2cDriverConfig:
+
+ @pytest.mark.parametrize(
+ "address, expected",
+ [
+ pytest.param("0x03", "0x03", id="string: minimum address"),
+ pytest.param("0x77", "0x77", id="string: maximum address"),
+ pytest.param("0x00", ValidationError, id="string: below minimum address"),
+ pytest.param("0x78", ValidationError, id="string: above maximum address"),
+ pytest.param("1e", "0x1e", id="string: valid address without 0x"),
+ pytest.param("0xij", ValidationError, id="string: invalid hex"),
+ pytest.param("0x0A", "0x0a", id="string: upper case hex"),
+ pytest.param(0x03, "0x03", id="int: minimum address"),
+ pytest.param(0x77, "0x77", id="int: maximum address"),
+ pytest.param(0x00, ValidationError, id="int: below minimum address"),
+ pytest.param(0x78, ValidationError, id="int: above maximum address"),
+ ],
+ )
+ def test_address_validation(
+ self, address: str | int, expected: str | type[Exception]
+ ):
+ """This function ensures that the address is valid."""
+
+ if isinstance(expected, str):
+ context = nullcontext()
+ else:
+ context = pytest.raises(expected)
+
+ with context:
+ config = I2cDriverConfig(
+ identifier="does-not-matter",
+ driver_module=VALID_DRIVER_MODULE,
+ direction="input", # does not matter for this test
+ address=address,
+ )
+
+ assert config.address == expected
+ assert config.address_int == int(config.address, 16)
diff --git a/lib/py_edge_interface/carlos/edge/interface/device/driver_test.py b/lib/py_edge_interface/carlos/edge/interface/device/driver_test.py
new file mode 100644
index 00000000..9301036e
--- /dev/null
+++ b/lib/py_edge_interface/carlos/edge/interface/device/driver_test.py
@@ -0,0 +1,250 @@
+from contextlib import nullcontext
+from secrets import token_hex
+from typing import Self
+
+import pytest
+from pydantic import BaseModel
+
+from .driver import (
+ AnalogInput,
+ CarlosDriver,
+ DigitalOutput,
+ DriverFactory,
+ validate_device_address_space,
+)
+from .driver_config import GpioDriverConfig, I2cDriverConfig
+
+DRIVER_MODULE = __name__
+
+ANALOG_INPUT_VALUE = {"value": 0.0}
+ANALOG_INPUT_CONFIG = GpioDriverConfig(
+ identifier="analog-input-test",
+ driver_module=DRIVER_MODULE,
+ direction="input",
+ pin=13,
+)
+DIGITAL_OUTPUT_CONFIG = GpioDriverConfig(
+ identifier="digital-output-test",
+ driver_module=DRIVER_MODULE,
+ direction="output",
+ pin=14,
+)
+
+
+class AnalogInputTest(AnalogInput):
+
+ def setup(self):
+ pass
+
+ def read(self) -> dict[str, float]:
+ return ANALOG_INPUT_VALUE
+
+
+def test_carlos_driver_base():
+ """This test test the methods of the CarlosDriverBase class via the
+ AnalogInputTest class."""
+
+ driver = AnalogInputTest(config=ANALOG_INPUT_CONFIG)
+
+ assert isinstance(str(driver), str), "Could not convert driver to string."
+ assert (
+ driver.identifier == ANALOG_INPUT_CONFIG.identifier
+ ), "Identifier should be the same as in the config."
+
+
+def test_analog_input():
+ """This test tests the AnalogInput Interface bia the AnalogInputTest class."""
+ analog_input = AnalogInputTest(config=ANALOG_INPUT_CONFIG)
+
+ assert (
+ analog_input.test() == ANALOG_INPUT_VALUE
+ ), "Test function should return a reading."
+
+ # using output config for input should raise an error
+ with pytest.raises(ValueError):
+ AnalogInputTest(config=DIGITAL_OUTPUT_CONFIG)
+
+
+@pytest.mark.asyncio
+async def test_async_analog_input():
+ """This test tests the AnalogInput Interface bia the AnalogInputTest class."""
+ analog_input = AnalogInputTest(config=ANALOG_INPUT_CONFIG)
+
+ assert (
+ await analog_input.read_async() == ANALOG_INPUT_VALUE
+ ), "Test function should return a reading."
+
+
+class DigitalOutputTest(DigitalOutput):
+
+ def setup(self) -> Self:
+ self.pytest_state = None
+ return self
+
+ def set(self, value: bool):
+ self.pytest_state = value
+
+
+def test_digital_output():
+ """This test tests the DigitalOutput Interface bia the DigitalOutputTest class."""
+ digital_output = DigitalOutputTest(config=DIGITAL_OUTPUT_CONFIG).setup()
+
+ assert digital_output.pytest_state is None, "Initial state should be None."
+
+ digital_output.test()
+
+ assert (
+ digital_output.pytest_state is not None
+ ), "State should be set to a value after running the test."
+
+ # using input config for output should raise an error
+ with pytest.raises(ValueError):
+ DigitalOutputTest(config=ANALOG_INPUT_CONFIG)
+
+
+def test_driver_factory():
+ """This test tests the driver factory function."""
+
+ factory = DriverFactory()
+
+ raw_analog_input_config = ANALOG_INPUT_CONFIG.model_dump(mode="json")
+
+ # Sofar the AnalogInputTest class was never registered, thus the build method
+ # should raise a RuntimeError.
+ with pytest.raises(RuntimeError):
+ factory.build(raw_analog_input_config)
+
+ factory.register(
+ driver_module=DRIVER_MODULE, config=GpioDriverConfig, factory=AnalogInputTest
+ )
+
+ # Trying to register a driver with the same driver_module should raise a RuntimeError.
+ with pytest.raises(RuntimeError):
+ factory.register(
+ driver_module=DRIVER_MODULE,
+ config=GpioDriverConfig,
+ factory=AnalogInputTest,
+ )
+
+ # Passing a non DriverConfig type as config should raise a ValueError.
+ with pytest.raises(ValueError):
+ factory.register(
+ driver_module=DRIVER_MODULE + ".test",
+ config=BaseModel,
+ factory=DigitalOutputTest,
+ )
+
+ driver = factory.build(raw_analog_input_config)
+ assert isinstance(
+ driver, AnalogInputTest
+ ), "Driver should be an instance of AnalogInputTest."
+ assert isinstance(
+ driver.config, GpioDriverConfig
+ ), "Config should be an instance of GpioDriverConfig."
+
+
+@pytest.mark.parametrize(
+ "drivers, expected_exception",
+ [
+ pytest.param([AnalogInputTest(ANALOG_INPUT_CONFIG)], None, id="valid-single"),
+ pytest.param(
+ [
+ AnalogInputTest(ANALOG_INPUT_CONFIG),
+ DigitalOutputTest(DIGITAL_OUTPUT_CONFIG),
+ ],
+ None,
+ id="valid-multiple",
+ ),
+ pytest.param(
+ [
+ AnalogInputTest(
+ GpioDriverConfig(
+ identifier="test",
+ pin=2,
+ direction="input",
+ driver_module=DRIVER_MODULE,
+ )
+ )
+ ],
+ None,
+ id="valid-i2c-ping-used",
+ ),
+ pytest.param(
+ [
+ AnalogInputTest(ANALOG_INPUT_CONFIG),
+ AnalogInputTest(ANALOG_INPUT_CONFIG),
+ ],
+ ValueError,
+ id="duplicate-identifier",
+ ),
+ pytest.param(
+ [
+ AnalogInputTest(ANALOG_INPUT_CONFIG),
+ AnalogInputTest(
+ ANALOG_INPUT_CONFIG.model_copy(
+ update={"identifier": "new-identifier"}
+ )
+ ),
+ ],
+ ValueError,
+ id="duplicate-identifier-gpio-pin",
+ ),
+ pytest.param(
+ [
+ AnalogInputTest(
+ I2cDriverConfig(
+ identifier=token_hex(4),
+ driver_module=DRIVER_MODULE,
+ direction="input",
+ address="0x04",
+ )
+ ),
+ AnalogInputTest(
+ GpioDriverConfig(
+ identifier="test",
+ pin=2,
+ direction="input",
+ driver_module=DRIVER_MODULE,
+ )
+ ),
+ ],
+ ValueError,
+ id="i2c-pin-used",
+ ),
+ pytest.param(
+ [
+ AnalogInputTest(
+ I2cDriverConfig(
+ identifier=token_hex(4),
+ driver_module=DRIVER_MODULE,
+ direction="input",
+ address="0x04",
+ )
+ ),
+ AnalogInputTest(
+ I2cDriverConfig(
+ identifier=token_hex(4),
+ driver_module=DRIVER_MODULE,
+ direction="input",
+ address="0x04",
+ )
+ ),
+ ],
+ ValueError,
+ id="duplicate-i2c-address",
+ ),
+ ],
+)
+def test_validate_device_address_space(
+ drivers: list[CarlosDriver], expected_exception: type[Exception] | None
+):
+ """This method ensures that the validate_device_address_space() works
+ as expected."""
+
+ if expected_exception is not None:
+ context = pytest.raises(expected_exception)
+ else:
+ context = nullcontext()
+
+ with context:
+ validate_device_address_space(drivers)
diff --git a/lib/py_edge_interface/pyproject.toml b/lib/py_edge_interface/pyproject.toml
index f52f7c76..e3b3746f 100644
--- a/lib/py_edge_interface/pyproject.toml
+++ b/lib/py_edge_interface/pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "carlos.edge.interface"
-version = "0.1.0"
+version = "0.1.1"
description = "Shared library to handle the edge communication."
authors = ["Felix Fanghanel"]
license = "MIT"
@@ -23,7 +23,7 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.bumpversion]
-current_version = "0.1.0"
+current_version = "0.1.1"
commit = true
tag = false
parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)(\\-(?P[a-z0-9\\.]+))?"
diff --git a/services/device/device/cli/config.py b/services/device/device/cli/config.py
index 70946016..b634b6f4 100644
--- a/services/device/device/cli/config.py
+++ b/services/device/device/cli/config.py
@@ -3,9 +3,17 @@
from typing import TypeVar
import typer
+from carlos.edge.device.runtime import DriverManager
from pydantic import BaseModel
from pydantic_core import PydanticUndefinedType
from rich import print, print_json
+from rich.console import Console, Group
+from rich.live import Live
+from rich.panel import Panel
+from rich.pretty import Pretty
+from rich.rule import Rule
+from rich.spinner import Spinner
+from rich.traceback import Traceback
from device.connection import (
ConnectionSettings,
@@ -13,6 +21,8 @@
write_connection_settings,
)
+console = Console()
+
config_cli = typer.Typer()
@@ -54,3 +64,47 @@ def show():
print("\n[bold]Connection[/bold] configuration:")
print_json(read_connection_settings().model_dump_json())
+
+
+@config_cli.command()
+def test(): # pragma: no cover
+ """Tests the io peripherals."""
+
+ driver_result_ui = []
+ failed = []
+ passed_cnt = 0
+ with Live(Group(), refresh_per_second=4) as live:
+ for driver in DriverManager().setup().drivers:
+ driver_result_ui.append(
+ Panel(
+ Spinner(name="aesthetic", text="testing..."),
+ padding=(1, 2),
+ title=str(driver),
+ title_align="left",
+ subtitle="testing",
+ subtitle_align="right",
+ )
+ )
+ live.update(Group(*driver_result_ui))
+
+ try:
+ result = driver.test()
+ driver_result_ui[-1].renderable = (
+ Pretty(result) if result else "[green]passed[/green]"
+ )
+ driver_result_ui[-1].subtitle = "[green]passed[/green]"
+ passed_cnt += 1
+ except Exception as e:
+ failed.append(driver)
+ driver_result_ui[-1].renderable = Traceback.from_exception(
+ type(e), e, e.__traceback__
+ )
+ driver_result_ui[-1].subtitle = "[red]failed[/red]"
+
+ conclusions = []
+ if passed_cnt > 0:
+ conclusions.append(f"[green]{passed_cnt} passed[/green]")
+ if failed:
+ conclusions.append(f"[red]{len(failed)} failed[/red]")
+
+ live.update(Group(*driver_result_ui, Rule(", ".join(conclusions))))
diff --git a/services/device/device/run.py b/services/device/device/run.py
index 12f6cde4..37fede57 100644
--- a/services/device/device/run.py
+++ b/services/device/device/run.py
@@ -1,4 +1,4 @@
-from carlos.edge.device import DeviceRuntime, read_config
+from carlos.edge.device import DeviceRuntime
from carlos.edge.device.constants import VERSION
from loguru import logger
@@ -24,12 +24,11 @@ async def main(): # pragma: no cover
logger.info(f"Starting Carlos device (v{VERSION})...")
- device_config = read_config()
device_connection = read_connection_settings()
protocol = DeviceWebsocketClient(settings=device_connection)
runtime = DeviceRuntime(
- config=device_config,
+ device_id=device_connection.device_id,
protocol=protocol,
)
await runtime.run()
diff --git a/services/device/poetry.lock b/services/device/poetry.lock
index 80e56863..274ce7a3 100644
--- a/services/device/poetry.lock
+++ b/services/device/poetry.lock
@@ -159,9 +159,12 @@ develop = false
apscheduler = "^4.0.0a4"
"carlos.edge.interface" = {path = "../py_edge_interface"}
loguru = "^0.7.2"
+psutil = "^5.9.8"
pydantic = "^2.6.4"
pyyaml = "^6.0.1"
+rpi-gpio = {version = "^0.7.1", markers = "platform_machine == \"armv7l\" or platform_machine == \"aarch64\""}
semver = "^3.0.2"
+smbus2 = "^0.4.3"
[package.source]
type = "directory"
@@ -789,6 +792,34 @@ files = [
[package.dependencies]
wcwidth = "*"
+[[package]]
+name = "psutil"
+version = "5.9.8"
+description = "Cross-platform lib for process and system monitoring in Python."
+optional = false
+python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*"
+files = [
+ {file = "psutil-5.9.8-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:26bd09967ae00920df88e0352a91cff1a78f8d69b3ecabbfe733610c0af486c8"},
+ {file = "psutil-5.9.8-cp27-cp27m-manylinux2010_i686.whl", hash = "sha256:05806de88103b25903dff19bb6692bd2e714ccf9e668d050d144012055cbca73"},
+ {file = "psutil-5.9.8-cp27-cp27m-manylinux2010_x86_64.whl", hash = "sha256:611052c4bc70432ec770d5d54f64206aa7203a101ec273a0cd82418c86503bb7"},
+ {file = "psutil-5.9.8-cp27-cp27mu-manylinux2010_i686.whl", hash = "sha256:50187900d73c1381ba1454cf40308c2bf6f34268518b3f36a9b663ca87e65e36"},
+ {file = "psutil-5.9.8-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:02615ed8c5ea222323408ceba16c60e99c3f91639b07da6373fb7e6539abc56d"},
+ {file = "psutil-5.9.8-cp27-none-win32.whl", hash = "sha256:36f435891adb138ed3c9e58c6af3e2e6ca9ac2f365efe1f9cfef2794e6c93b4e"},
+ {file = "psutil-5.9.8-cp27-none-win_amd64.whl", hash = "sha256:bd1184ceb3f87651a67b2708d4c3338e9b10c5df903f2e3776b62303b26cb631"},
+ {file = "psutil-5.9.8-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:aee678c8720623dc456fa20659af736241f575d79429a0e5e9cf88ae0605cc81"},
+ {file = "psutil-5.9.8-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8cb6403ce6d8e047495a701dc7c5bd788add903f8986d523e3e20b98b733e421"},
+ {file = "psutil-5.9.8-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d06016f7f8625a1825ba3732081d77c94589dca78b7a3fc072194851e88461a4"},
+ {file = "psutil-5.9.8-cp36-cp36m-win32.whl", hash = "sha256:7d79560ad97af658a0f6adfef8b834b53f64746d45b403f225b85c5c2c140eee"},
+ {file = "psutil-5.9.8-cp36-cp36m-win_amd64.whl", hash = "sha256:27cc40c3493bb10de1be4b3f07cae4c010ce715290a5be22b98493509c6299e2"},
+ {file = "psutil-5.9.8-cp37-abi3-win32.whl", hash = "sha256:bc56c2a1b0d15aa3eaa5a60c9f3f8e3e565303b465dbf57a1b730e7a2b9844e0"},
+ {file = "psutil-5.9.8-cp37-abi3-win_amd64.whl", hash = "sha256:8db4c1b57507eef143a15a6884ca10f7c73876cdf5d51e713151c1236a0e68cf"},
+ {file = "psutil-5.9.8-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:d16bbddf0693323b8c6123dd804100241da461e41d6e332fb0ba6058f630f8c8"},
+ {file = "psutil-5.9.8.tar.gz", hash = "sha256:6be126e3225486dff286a8fb9a06246a5253f4c7c53b475ea5f5ac934e64194c"},
+]
+
+[package.extras]
+test = ["enum34", "ipaddress", "mock", "pywin32", "wmi"]
+
[[package]]
name = "pydantic"
version = "2.7.0"
@@ -1158,6 +1189,21 @@ typing-extensions = "*"
[package.extras]
dev = ["flake8", "flake8-docstrings", "mypy", "packaging", "pre-commit", "pytest", "pytest-cov", "types-setuptools"]
+[[package]]
+name = "rpi-gpio"
+version = "0.7.1"
+description = "A module to control Raspberry Pi GPIO channels"
+optional = false
+python-versions = "*"
+files = [
+ {file = "RPi.GPIO-0.7.1-cp27-cp27mu-linux_armv6l.whl", hash = "sha256:b86b66dc02faa5461b443a1e1f0c1d209d64ab5229696f32fb3b0215e0600c8c"},
+ {file = "RPi.GPIO-0.7.1-cp310-cp310-linux_armv6l.whl", hash = "sha256:57b6c044ef5375a78c8dda27cdfadf329e76aa6943cd6cffbbbd345a9adf9ca5"},
+ {file = "RPi.GPIO-0.7.1-cp37-cp37m-linux_armv6l.whl", hash = "sha256:77afb817b81331ce3049a4b8f94a85e41b7c404d8e56b61ac0f1eb75c3120868"},
+ {file = "RPi.GPIO-0.7.1-cp38-cp38-linux_armv6l.whl", hash = "sha256:29226823da8b5ccb9001d795a944f2e00924eeae583490f0bc7317581172c624"},
+ {file = "RPi.GPIO-0.7.1-cp39-cp39-linux_armv6l.whl", hash = "sha256:15311d3b063b71dee738cd26570effc9985a952454d162937c34e08c0fc99902"},
+ {file = "RPi.GPIO-0.7.1.tar.gz", hash = "sha256:cd61c4b03c37b62bba4a5acfea9862749c33c618e0295e7e90aa4713fb373b70"},
+]
+
[[package]]
name = "ruff"
version = "0.3.7"
@@ -1206,6 +1252,22 @@ files = [
{file = "shellingham-1.5.4.tar.gz", hash = "sha256:8dbca0739d487e5bd35ab3ca4b36e11c4078f3a234bfce294b0a0291363404de"},
]
+[[package]]
+name = "smbus2"
+version = "0.4.3"
+description = "smbus2 is a drop-in replacement for smbus-cffi/smbus-python in pure Python"
+optional = false
+python-versions = "*"
+files = [
+ {file = "smbus2-0.4.3-py2.py3-none-any.whl", hash = "sha256:a2fc29cfda4081ead2ed61ef2c4fc041d71dd40a8d917e85216f44786fca2d1d"},
+ {file = "smbus2-0.4.3.tar.gz", hash = "sha256:36f2288a8e1a363cb7a7b2244ec98d880eb5a728a2494ac9c71e9de7bf6a803a"},
+]
+
+[package.extras]
+docs = ["sphinx (>=1.5.3)"]
+qa = ["flake8"]
+test = ["mock", "nose"]
+
[[package]]
name = "sniffio"
version = "1.3.1"