diff --git a/lib/py_edge_device/carlos/edge/device/config.py b/lib/py_edge_device/carlos/edge/device/config.py index d87c718c..7fa84407 100644 --- a/lib/py_edge_device/carlos/edge/device/config.py +++ b/lib/py_edge_device/carlos/edge/device/config.py @@ -11,11 +11,12 @@ from typing import TypeVar import yaml -from carlos.edge.interface.device import CarlosIO, IoFactory +from carlos.edge.interface.device import CarlosIO, IoConfig, IoFactory from loguru import logger from pydantic import BaseModel from carlos.edge.device.constants import CONFIG_FILE_NAME +from carlos.edge.device.io.device_metrics import DeviceMetrics Config = TypeVar("Config", bound=BaseModel) @@ -53,6 +54,17 @@ def load_io(config_dir: Path | None = None) -> list[CarlosIO]: ios = [io_factory.build(config) for config in raw_config.get("io", [])] + # We always want to have some device metrics + if not any(isinstance(io, DeviceMetrics) for io in ios): + ios.insert( + 0, + io_factory.build( + IoConfig( + identifier="__device_metrics__", driver=DeviceMetrics.__module__ + ).model_dump() + ), + ) + logger.info(f"Loaded {len(ios)} IOs: {', '.join(str(io) for io in ios)}") return ios diff --git a/lib/py_edge_device/carlos/edge/device/io/_dhtxx.py b/lib/py_edge_device/carlos/edge/device/io/_dhtxx.py index 5beb947c..e97e6f96 100644 --- a/lib/py_edge_device/carlos/edge/device/io/_dhtxx.py +++ b/lib/py_edge_device/carlos/edge/device/io/_dhtxx.py @@ -1,6 +1,9 @@ +from abc import ABC from enum import StrEnum from time import sleep +from carlos.edge.interface.device import AnalogInput, GpioConfig + from carlos.edge.device.protocol import GPIO @@ -107,3 +110,38 @@ def read(self) -> tuple[float, float]: temperature = float(int(data[17:32], 2) * 0.2 * (0.5 - int(data[16], 2))) return humidity, temperature + + +class DHTXX(AnalogInput, ABC): + """DHTXX Temperature and Humidity Sensor.""" + + def __init__(self, config: GpioConfig): + + super().__init__(config=config) + + self._dht: DHT | None = None + self._dht_type: DHTType | None = None + + def setup(self): + """Sets up the DHT11 sensor.""" + + self._dht = DHT(dht_type=self._dht_type, pin=self.config.pin) + + def read(self) -> dict[str, float]: + """Reads the temperature and humidity.""" + + assert self._dht is not None, "The DHT sensor has not been initialized." + + # Reading the DHT sensor is quite unreliable, as the device is not a real-time + # device. Thus, we just try it a couple of times and fail if it does not work. + for i in range(16): + try: + temperature, humidity = self._dht.read() + return { + "temperature": temperature, + "humidity": humidity, + } + except RuntimeError: + pass + + raise RuntimeError(f"Could not read {self._dht_type} sensor.") diff --git a/lib/py_edge_device/carlos/edge/device/io/dht11.py b/lib/py_edge_device/carlos/edge/device/io/dht11.py index 7460d610..5ca094e8 100644 --- a/lib/py_edge_device/carlos/edge/device/io/dht11.py +++ b/lib/py_edge_device/carlos/edge/device/io/dht11.py @@ -1,40 +1,16 @@ -from carlos.edge.interface.device import AnalogInput, GpioConfig, IoFactory +from carlos.edge.interface.device import GpioConfig, IoFactory -from ._dhtxx import DHT, DHTType +from ._dhtxx import DHTXX, DHTType -class DHT11(AnalogInput): +class DHT11(DHTXX): """DHT11 Temperature and Humidity Sensor.""" def __init__(self, config: GpioConfig): super().__init__(config=config) - self._dht: DHT | None = None - - def setup(self): - """Sets up the DHT11 sensor.""" - - self._dht = DHT(dht_type=DHTType.DHT11, pin=self.config.pin) - - def read(self) -> dict[str, float]: - """Reads the temperature and humidity.""" - - assert self._dht is not None, "The DHT sensor has not been initialized." - - # Reading the DHT sensor is quite unreliable, as the device is not a real-time - # device. Thus, we just try it a couple of times and fail if it does not work. - for i in range(16): - try: - temperature, humidity = self._dht.read() - return { - "temperature": temperature, - "humidity": humidity, - } - except RuntimeError: - pass - - raise RuntimeError("Could not read DHT11 sensor.") + self._dht_type = DHTType.DHT11 IoFactory().register(ptype=__name__, config=GpioConfig, factory=DHT11) diff --git a/lib/py_edge_device/carlos/edge/device/io/dht22.py b/lib/py_edge_device/carlos/edge/device/io/dht22.py new file mode 100644 index 00000000..81fdcadf --- /dev/null +++ b/lib/py_edge_device/carlos/edge/device/io/dht22.py @@ -0,0 +1,16 @@ +from carlos.edge.interface.device import GpioConfig, IoFactory + +from ._dhtxx import DHTXX, DHTType + + +class DHT22(DHTXX): + """DHT22 Temperature and Humidity Sensor.""" + + def __init__(self, config: GpioConfig): + + super().__init__(config=config) + + self._dht_type = DHTType.DHT22 + + +IoFactory().register(ptype=__name__, config=GpioConfig, factory=DHT22) diff --git a/lib/py_edge_device/carlos/edge/device/io/relay.py b/lib/py_edge_device/carlos/edge/device/io/relay.py index be0be8a7..dcc3f2b0 100644 --- a/lib/py_edge_device/carlos/edge/device/io/relay.py +++ b/lib/py_edge_device/carlos/edge/device/io/relay.py @@ -1,12 +1,20 @@ +from typing import Literal + from carlos.edge.interface.device import DigitalOutput, GpioConfig, IoFactory +from pydantic import Field from carlos.edge.device.protocol import GPIO +class RelayConfig(GpioConfig): + + direction: Literal["output"] = Field("output") + + class Relay(DigitalOutput): """Relay.""" - def __init__(self, config: GpioConfig): + def __init__(self, config: RelayConfig): super().__init__(config=config) def setup(self): @@ -17,4 +25,4 @@ def set(self, value: bool): GPIO.output(self.config.pin, value) -IoFactory().register(ptype=__name__, config=GpioConfig, factory=Relay) +IoFactory().register(ptype=__name__, config=RelayConfig, factory=Relay) diff --git a/lib/py_edge_device/carlos/edge/device/io/si1145.py b/lib/py_edge_device/carlos/edge/device/io/si1145.py index 8bf3f18a..a01b1674 100644 --- a/lib/py_edge_device/carlos/edge/device/io/si1145.py +++ b/lib/py_edge_device/carlos/edge/device/io/si1145.py @@ -1,13 +1,22 @@ import time +from typing import Literal from carlos.edge.interface.device import AnalogInput, I2cConfig, IoFactory +from pydantic import Field from carlos.edge.device.protocol import I2C +class Si1145Config(I2cConfig): + + direction: Literal["input"] = Field("input") + + address: Literal["0x60"] = Field("0x60") + + class SI1145(AnalogInput): - def __init__(self, config: I2cConfig): + def __init__(self, config: Si1145Config): if config.address_int != SDL_Pi_SI1145.ADDR: raise ValueError( @@ -42,7 +51,7 @@ def read(self) -> dict[str, float]: } -IoFactory().register(ptype=__name__, config=I2cConfig, factory=SI1145) +IoFactory().register(ptype=__name__, config=Si1145Config, factory=SI1145) class SDL_Pi_SI1145: diff --git a/lib/py_edge_device/carlos/edge/device/runtime.py b/lib/py_edge_device/carlos/edge/device/runtime.py index d268cbf8..442f528b 100644 --- a/lib/py_edge_device/carlos/edge/device/runtime.py +++ b/lib/py_edge_device/carlos/edge/device/runtime.py @@ -28,8 +28,6 @@ def __init__(self, device_id: DeviceId, protocol: EdgeProtocol): self.device_id = device_id self.protocol = protocol - self.ios = load_io() - async def run(self): """Runs the device runtime.""" @@ -68,11 +66,15 @@ def _prepare_runtime(self): retention=timedelta(days=60), ) - validate_device_address_space(self.ios) - self._setup_io() +class IoManager: # pragma: no cover - def _setup_io(self): + def __init__(self): + + self.ios = load_io() + validate_device_address_space(self.ios) + + def setup(self): """Sets up the I/O peripherals.""" for io in self.ios: logger.debug( @@ -80,6 +82,14 @@ def _setup_io(self): ) io.setup() + def test(self): + """Tests the I/O peripherals.""" + for io in self.ios: + logger.debug( + f"Testing I/O peripheral {io.config.identifier} ({io.config.module})." + ) + io.test() + async def send_ping( communication_handler: DeviceCommunicationHandler, diff --git a/lib/py_edge_interface/carlos/edge/interface/device/io.py b/lib/py_edge_interface/carlos/edge/interface/device/io.py index 3d70319d..7db22ae0 100644 --- a/lib/py_edge_interface/carlos/edge/interface/device/io.py +++ b/lib/py_edge_interface/carlos/edge/interface/device/io.py @@ -8,9 +8,12 @@ import asyncio import concurrent.futures from abc import ABC, abstractmethod +from asyncio import sleep from collections import namedtuple from typing import Any, Callable, Generic, Iterable, TypeVar +from loguru import logger + from .config import GpioConfig, I2cConfig, IoConfig IoConfigTypeVar = TypeVar("IoConfigTypeVar", bound=IoConfig) @@ -31,6 +34,11 @@ def setup(self): is not able to run the setup method of the peripheral outside the device.""" pass + @abstractmethod + def test(self): + """Tests the peripheral. This is used to validate a config by a human.""" + pass + class AnalogInput(CarlosPeripheral, ABC): """Common base class for all analog input peripherals.""" @@ -41,9 +49,16 @@ def read(self) -> dict[str, float]: containing the value of the analog input.""" pass + def test(self): + """Tests the analog input by reading the value.""" + + logger.info(f"Testing {self}") + data = self.read() + logger.info(f"Read data: {data}") + async def read_async(self) -> dict[str, float]: - """Reads the value of the analog input asynchronously. The return value is a dictionary - containing the value of the analog input.""" + """Reads the value of the analog input asynchronously. The return value is a + dictionary containing the value of the analog input.""" loop = asyncio.get_running_loop() @@ -58,6 +73,18 @@ class DigitalOutput(CarlosPeripheral, ABC): def set(self, value: bool): pass + def test(self): + """Tests the digital output by setting the value to False, then True for 1 second, + and then back to False.""" + + logger.info(f"Testing {self}") + self.set(False) + self.set(True) + logger.info(f"Set value to True.") + sleep(1) + self.set(False) + logger.info(f"Set value to False.") + CarlosIO = AnalogInput | DigitalOutput