diff --git a/lib/py_edge_device/carlos/edge/device/config.py b/lib/py_edge_device/carlos/edge/device/config.py index bfbd8229..7e0faebf 100644 --- a/lib/py_edge_device/carlos/edge/device/config.py +++ b/lib/py_edge_device/carlos/edge/device/config.py @@ -11,7 +11,11 @@ from typing import TypeVar import yaml -from carlos.edge.interface.device import CarlosDriver, DriverConfig, DriverFactory +from carlos.edge.interface.device import CarlosDriver, DriverFactory +from carlos.edge.interface.device.driver_config import ( + DriverConfigWithDirection, + DriverDirection, +) from loguru import logger from pydantic import BaseModel @@ -65,9 +69,10 @@ def load_drivers(config_dir: Path | None = None) -> list[CarlosDriver]: driver_configs.insert( 0, factory.build( - DriverConfig( + DriverConfigWithDirection( identifier="__device_metrics__", driver_module=DeviceMetrics.__module__, + direction=DriverDirection.INPUT, ).model_dump() ), ) diff --git a/lib/py_edge_device/carlos/edge/device/driver/_dhtxx.py b/lib/py_edge_device/carlos/edge/device/driver/_dhtxx.py index 22ce7706..6000a454 100644 --- a/lib/py_edge_device/carlos/edge/device/driver/_dhtxx.py +++ b/lib/py_edge_device/carlos/edge/device/driver/_dhtxx.py @@ -5,6 +5,8 @@ from time import sleep from carlos.edge.interface.device import AnalogInput, DriverDirection, GpioDriverConfig +from carlos.edge.interface.device.driver_config import DriverSignal +from carlos.edge.interface.units import UnitOfMeasurement from pydantic import Field from carlos.edge.device.protocol import GPIO @@ -124,6 +126,9 @@ def read(self) -> tuple[float, float]: class DHTXX(AnalogInput, ABC): """DHTXX Temperature and Humidity Sensor.""" + TEMP_SIGNAL_ID = "temperature" + HUMIDITY_SIGNAL_ID = "humidity" + def __init__(self, config: GpioDriverConfig): super().__init__(config=config) @@ -131,6 +136,20 @@ def __init__(self, config: GpioDriverConfig): self._dht: DHT | None = None self._dht_type: DHTType | None = None + def signals(self) -> list[DriverSignal]: + """Returns the signals of the DHT sensor.""" + + return [ + DriverSignal( + signal_identifier=self.TEMP_SIGNAL_ID, + unit_of_measurement=UnitOfMeasurement.CELSIUS, + ), + DriverSignal( + signal_identifier=self.HUMIDITY_SIGNAL_ID, + unit_of_measurement=UnitOfMeasurement.HUMIDITY_PERCENTAGE, + ), + ] + def setup(self): """Sets up the DHT11 sensor.""" @@ -148,8 +167,8 @@ def read(self) -> dict[str, float]: try: temperature, humidity = self._dht.read() return { - "temperature": temperature, - "humidity": humidity, + self.TEMP_SIGNAL_ID: temperature, + self.HUMIDITY_SIGNAL_ID: humidity, } except RuntimeError as ex: last_error = ex diff --git a/lib/py_edge_device/carlos/edge/device/driver/device_metrics.py b/lib/py_edge_device/carlos/edge/device/driver/device_metrics.py index 5d8b2a06..7467535a 100644 --- a/lib/py_edge_device/carlos/edge/device/driver/device_metrics.py +++ b/lib/py_edge_device/carlos/edge/device/driver/device_metrics.py @@ -1,14 +1,46 @@ import psutil -from carlos.edge.interface.device import AnalogInput, DriverConfig, DriverFactory +from carlos.edge.interface.device import AnalogInput, DriverFactory +from carlos.edge.interface.device.driver_config import ( + DriverConfigWithDirection, + DriverSignal, +) +from carlos.edge.interface.units import UnitOfMeasurement class DeviceMetrics(AnalogInput): """Provides the metrics of the device.""" - def __init__(self, config: DriverConfig): + _CPU_LOAD_SIGNAL_ID = "cpu.load_percent" + _CPU_TEMP_SIGNAL_ID = "cpu.temperature" + _MEMORY_USAGE_SIGNAL_ID = "memory.usage_percent" + _DISK_USAGE_SIGNAL_ID = "disk.usage_percent" + + def __init__(self, config: DriverConfigWithDirection): super().__init__(config=config) + def signals(self) -> list[DriverSignal]: + """Returns the signals of the DHT sensor.""" + + return [ + DriverSignal( + signal_identifier=self._CPU_LOAD_SIGNAL_ID, + unit_of_measurement=UnitOfMeasurement.PERCENTAGE, + ), + DriverSignal( + signal_identifier=self._CPU_TEMP_SIGNAL_ID, + unit_of_measurement=UnitOfMeasurement.CELSIUS, + ), + DriverSignal( + signal_identifier=self._MEMORY_USAGE_SIGNAL_ID, + unit_of_measurement=UnitOfMeasurement.PERCENTAGE, + ), + DriverSignal( + signal_identifier=self._DISK_USAGE_SIGNAL_ID, + unit_of_measurement=UnitOfMeasurement.PERCENTAGE, + ), + ] + def setup(self): pass @@ -16,10 +48,10 @@ def read(self) -> dict[str, float]: """Reads the device metrics.""" return { - "cpu.load_percent": psutil.cpu_percent(interval=1.0), - "cpu.temperature": self._read_cpu_temp(), - "memory.usage_percent": psutil.virtual_memory().percent, - "disk.usage_percent": psutil.disk_usage("/").percent, + self._CPU_LOAD_SIGNAL_ID: psutil.cpu_percent(interval=1.0), + self._CPU_TEMP_SIGNAL_ID: self._read_cpu_temp(), + self._MEMORY_USAGE_SIGNAL_ID: psutil.virtual_memory().percent, + self._DISK_USAGE_SIGNAL_ID: psutil.disk_usage("/").percent, } @staticmethod @@ -33,5 +65,5 @@ def _read_cpu_temp() -> float: DriverFactory().register( - driver_module=__name__, config=DriverConfig, factory=DeviceMetrics + driver_module=__name__, config=DriverConfigWithDirection, factory=DeviceMetrics ) diff --git a/lib/py_edge_device/carlos/edge/device/driver/relay.py b/lib/py_edge_device/carlos/edge/device/driver/relay.py index 15838b56..8971958a 100644 --- a/lib/py_edge_device/carlos/edge/device/driver/relay.py +++ b/lib/py_edge_device/carlos/edge/device/driver/relay.py @@ -7,6 +7,8 @@ DriverFactory, GpioDriverConfig, ) +from carlos.edge.interface.device.driver_config import DriverSignal +from carlos.edge.interface.units import UnitOfMeasurement from pydantic import Field from carlos.edge.device.protocol import GPIO @@ -20,11 +22,23 @@ class RelayConfig(GpioDriverConfig): class Relay(DigitalOutput, DigitalInput): """Relay.""" + _STATE_SIGNAL_ID = "state" + def __init__(self, config: RelayConfig): super().__init__(config=config) self._state = False + def signals(self) -> list[DriverSignal]: + """Returns the signals of the DHT sensor.""" + + return [ + DriverSignal( + signal_identifier=self._STATE_SIGNAL_ID, + unit_of_measurement=UnitOfMeasurement.UNIT_LESS, + ), + ] + def setup(self): # HIGH means off, LOW means @@ -42,7 +56,7 @@ def set(self, value: bool): def read(self) -> dict[str, bool]: """Reads the value of the relay.""" - return {"state": self._state} + return {self._STATE_SIGNAL_ID: self._state} def test(self): """Tests the relay by reading the value.""" @@ -50,19 +64,19 @@ def test(self): self.set(False) time.sleep(0.01) state = self.read() - if state["state"]: + if state[self._STATE_SIGNAL_ID]: raise ValueError(f"Value of relay was not set to false. Got: {state}") self.set(True) time.sleep(1) state = self.read() - if not state["state"]: + if not state[self._STATE_SIGNAL_ID]: raise ValueError(f"Value of relay was not set to true. Got: {state}") self.set(False) time.sleep(0.01) state = self.read() - if state["state"]: + if state[self._STATE_SIGNAL_ID]: raise ValueError(f"Value of relay was not set to false. Got: {state}") diff --git a/lib/py_edge_device/carlos/edge/device/driver/sht30.py b/lib/py_edge_device/carlos/edge/device/driver/sht30.py index 25e32f45..db703ed9 100644 --- a/lib/py_edge_device/carlos/edge/device/driver/sht30.py +++ b/lib/py_edge_device/carlos/edge/device/driver/sht30.py @@ -7,6 +7,8 @@ DriverFactory, I2cDriverConfig, ) +from carlos.edge.interface.device.driver_config import DriverSignal +from carlos.edge.interface.units import UnitOfMeasurement from pydantic import Field from carlos.edge.device.protocol import I2C @@ -37,6 +39,9 @@ class SHT30(AnalogInput): PARAM_HIGH_REPEATABLITY = 0x06 """Marks the measurement as high repeatability.""" + _TEMPERATURE_SIGNAL_ID = "temperature" + _HUMIDITY_SIGNAL_ID = "humidity" + def __init__(self, config: SHT30Config): if config.address_int not in SHT30.I2C_ADDRESSES: raise ValueError( @@ -48,6 +53,20 @@ def __init__(self, config: SHT30Config): self._i2c: I2C | None = None + def signals(self) -> list[DriverSignal]: + """Returns the signals of the DHT sensor.""" + + return [ + DriverSignal( + signal_identifier=self._TEMPERATURE_SIGNAL_ID, + unit_of_measurement=UnitOfMeasurement.CELSIUS, + ), + DriverSignal( + signal_identifier=self._HUMIDITY_SIGNAL_ID, + unit_of_measurement=UnitOfMeasurement.HUMIDITY_PERCENTAGE, + ), + ] + def setup(self): self._i2c = I2C(address=self.config.address_int) @@ -57,8 +76,8 @@ def read(self) -> dict[str, float]: read_delay_ms = 100 humidity, temperature = self._get_measurement(read_delay_ms=read_delay_ms) return { - "temperature": float(temperature), - "humidity": float(humidity), + self._TEMPERATURE_SIGNAL_ID: float(temperature), + self._HUMIDITY_SIGNAL_ID: float(humidity), } def _get_measurement(self, read_delay_ms: int = 100) -> tuple[float, float]: diff --git a/lib/py_edge_device/carlos/edge/device/driver/si1145.py b/lib/py_edge_device/carlos/edge/device/driver/si1145.py index 462e53b8..302847fa 100644 --- a/lib/py_edge_device/carlos/edge/device/driver/si1145.py +++ b/lib/py_edge_device/carlos/edge/device/driver/si1145.py @@ -7,6 +7,8 @@ DriverFactory, I2cDriverConfig, ) +from carlos.edge.interface.device.driver_config import DriverSignal +from carlos.edge.interface.units import UnitOfMeasurement from pydantic import Field from carlos.edge.device.protocol import I2C @@ -21,6 +23,12 @@ class Si1145Config(I2cDriverConfig): class SI1145(AnalogInput): + _VISUAL_LIGHT_SIGNAL_ID = "visual-light" + _VISUAL_LIGHT_RAW_SIGNAL_ID = "visual-light-raw" + _INFRARED_LIGHT_SIGNAL_ID = "infrared-light" + _INFRARED_LIGHT_RAW_SIGNAL_ID = "infrared-light-raw" + _UV_INDEX_SIGNAL_ID = "uv-index" + def __init__(self, config: Si1145Config): if config.address_int != SDL_Pi_SI1145.ADDR: @@ -32,6 +40,32 @@ def __init__(self, config: Si1145Config): self._si1145: SDL_Pi_SI1145 | None = None + def signals(self) -> list[DriverSignal]: + """Returns the signals of the DHT sensor.""" + + return [ + DriverSignal( + signal_identifier=self._VISUAL_LIGHT_SIGNAL_ID, + unit_of_measurement=UnitOfMeasurement.LUX, + ), + DriverSignal( + signal_identifier=self._VISUAL_LIGHT_RAW_SIGNAL_ID, + unit_of_measurement=UnitOfMeasurement.UNIT_LESS, + ), + DriverSignal( + signal_identifier=self._INFRARED_LIGHT_SIGNAL_ID, + unit_of_measurement=UnitOfMeasurement.LUX, + ), + DriverSignal( + signal_identifier=self._INFRARED_LIGHT_RAW_SIGNAL_ID, + unit_of_measurement=UnitOfMeasurement.UNIT_LESS, + ), + DriverSignal( + signal_identifier=self._UV_INDEX_SIGNAL_ID, + unit_of_measurement=UnitOfMeasurement.UNIT_LESS, + ), + ] + def setup(self): self._si1145 = SDL_Pi_SI1145() @@ -48,11 +82,11 @@ def read(self) -> dict[str, float]: uv_idx = self._si1145.read_uv_index() return { - "visual-light-raw": float(vis_raw), - "visual-light": float(vis_lux), - "infrared-light-raw": float(ir_raw), - "infrared-light": float(ir_lux), - "uv-index": float(uv_idx), + self._VISUAL_LIGHT_RAW_SIGNAL_ID: float(vis_raw), + self._VISUAL_LIGHT_SIGNAL_ID: float(vis_lux), + self._INFRARED_LIGHT_RAW_SIGNAL_ID: float(ir_raw), + self._INFRARED_LIGHT_SIGNAL_ID: float(ir_lux), + self._UV_INDEX_SIGNAL_ID: float(uv_idx), } diff --git a/lib/py_edge_device/carlos/edge/device/driver_manager.py b/lib/py_edge_device/carlos/edge/device/driver_manager.py new file mode 100644 index 00000000..a004ba5a --- /dev/null +++ b/lib/py_edge_device/carlos/edge/device/driver_manager.py @@ -0,0 +1,83 @@ +from datetime import UTC, datetime +from typing import Self + +from apscheduler import AsyncScheduler +from apscheduler.triggers.interval import IntervalTrigger +from carlos.edge.interface.device.driver import ( + InputDriver, + validate_device_address_space, +) +from carlos.edge.interface.device.driver_config import DriverMetadata +from loguru import logger + +from carlos.edge.device.config import load_drivers +from carlos.edge.device.storage.blackbox import Blackbox +from carlos.edge.device.storage.connection import get_async_storage_engine + +INPUT_SAMPLE_INTERVAL = 2.5 * 60 # 2.5 minutes +"""The time between two consecutive samples of the input devices in seconds.""" + + +class DriverManager: # pragma: no cover + + def __init__(self): + + self.drivers = {driver.identifier: driver for driver in load_drivers()} + validate_device_address_space(self.drivers.values()) + + self.blackbox = Blackbox(engine=get_async_storage_engine()) + + @property + def driver_metadata(self) -> list[DriverMetadata]: + """Returns the metadata of the registered drivers.""" + + return [ + DriverMetadata( + identifier=driver.identifier, + direction=driver.direction, + driver_module=driver.config.driver_module, + signals=driver.signals, + ) + for driver in self.drivers.values() + ] + + def setup(self) -> Self: + """Sets up the I/O peripherals.""" + for driver in self.drivers.values(): + logger.debug(f"Setting up driver {driver}.") + driver.setup() + + return self + + async def register_tasks(self, scheduler: AsyncScheduler) -> Self: + """Registers the tasks of the I/O peripherals.""" + + for driver in self.drivers.values(): + if isinstance(driver, InputDriver): + await scheduler.add_schedule( + func_or_task_id=self.read_input, + kwargs={"driver_identifier": driver.identifier}, + trigger=IntervalTrigger(seconds=INPUT_SAMPLE_INTERVAL), + ) + + return self + + async def read_input(self, driver_identifier: str): + """Reads the value of the input driver.""" + + logger.debug(f"Reading data from driver {driver_identifier}.") + + read_start = datetime.now(tz=UTC) + data = await self.drivers[driver_identifier].read_async() + read_end = datetime.now(tz=UTC) + # We assume that the actual read time is in the middle of the + # start and end time. + read_act = read_start + (read_end - read_start) / 2 + + logger.debug(f"Received data from driver {driver_identifier}: {data}") + + await self.blackbox.record( + driver_identifier=driver_identifier, + read_timestamp=read_act, + data=data, + ) diff --git a/lib/py_edge_device/carlos/edge/device/runtime.py b/lib/py_edge_device/carlos/edge/device/runtime.py index 3c4a844c..bbddb1ea 100644 --- a/lib/py_edge_device/carlos/edge/device/runtime.py +++ b/lib/py_edge_device/carlos/edge/device/runtime.py @@ -3,24 +3,23 @@ import asyncio import signal -from datetime import UTC, datetime, timedelta -from typing import Self +from datetime import timedelta from apscheduler import AsyncScheduler from apscheduler.triggers.interval import IntervalTrigger -from carlos.edge.interface import DeviceId, EdgeProtocol -from carlos.edge.interface.device.driver import ( - InputDriver, - validate_device_address_space, +from carlos.edge.interface import ( + PING, + CarlosMessage, + DeviceConfigPayload, + DeviceId, + EdgeProtocol, + MessageType, ) -from carlos.edge.interface.protocol import PING from loguru import logger from .communication import ClientEdgeCommunicationHandler -from .config import load_drivers from .constants import LOCAL_DEVICE_STORAGE_PATH -from .storage.blackbox import Blackbox -from .storage.connection import get_async_storage_engine +from .driver_manager import DriverManager from .storage.migration import alembic_upgrade @@ -43,6 +42,21 @@ def __init__(self, device_id: DeviceId, protocol: EdgeProtocol): self.task_scheduler: AsyncScheduler | None = None + async def on_connect(self, protocol: EdgeProtocol): + """This method is called when the protocol connects to the server. It sends the + device configuration to the server.""" + + logger.info("Connected to server.") + + await protocol.send( + CarlosMessage( + message_type=MessageType.DEVICE_CONFIG, + payload=DeviceConfigPayload( + drivers=self.driver_manager.driver_metadata + ), + ) + ) + async def run(self): """Runs the device runtime.""" @@ -133,61 +147,6 @@ async def _send_pending_data(self): pass -INPUT_SAMPLE_INTERVAL = 2.5 * 60 # 2.5 minutes -"""The time between two consecutive samples of the input devices in seconds.""" - - -class DriverManager: # pragma: no cover - - def __init__(self): - - self.drivers = {driver.identifier: driver for driver in load_drivers()} - validate_device_address_space(self.drivers.values()) - - self.blackbox = Blackbox(engine=get_async_storage_engine()) - - def setup(self) -> Self: - """Sets up the I/O peripherals.""" - for driver in self.drivers.values(): - logger.debug(f"Setting up driver {driver}.") - driver.setup() - - return self - - async def register_tasks(self, scheduler: AsyncScheduler) -> Self: - """Registers the tasks of the I/O peripherals.""" - - for driver in self.drivers.values(): - if isinstance(driver, InputDriver): - await scheduler.add_schedule( - func_or_task_id=self.read_input, - kwargs={"driver_identifier": driver.identifier}, - trigger=IntervalTrigger(seconds=INPUT_SAMPLE_INTERVAL), - ) - - return self - - async def read_input(self, driver_identifier: str): - """Reads the value of the input driver.""" - - logger.debug(f"Reading data from driver {driver_identifier}.") - - read_start = datetime.now(tz=UTC) - data = await self.drivers[driver_identifier].read_async() - read_end = datetime.now(tz=UTC) - # We assume that the actual read time is in the middle of the - # start and end time. - read_act = read_start + (read_end - read_start) / 2 - - logger.debug(f"Received data from driver {driver_identifier}: {data}") - - await self.blackbox.record( - driver_identifier=driver_identifier, - read_timestamp=read_act, - data=data, - ) - - async def send_ping( communication_handler: ClientEdgeCommunicationHandler, ): # pragma: no cover diff --git a/lib/py_edge_device/pyproject.toml b/lib/py_edge_device/pyproject.toml index cb28956c..3e316f6d 100644 --- a/lib/py_edge_device/pyproject.toml +++ b/lib/py_edge_device/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "carlos.edge.device" -version = "0.1.9" +version = "0.1.10" description = "The library for the edge device of the carlos project." authors = ["Felix Fanghanel"] license = "MIT" @@ -35,7 +35,7 @@ requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" [tool.bumpversion] -current_version = "0.1.9" +current_version = "0.1.10" commit = true tag = false parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)(\\-(?P[a-z0-9\\.]+))?" diff --git a/lib/py_edge_interface/carlos/edge/interface/__init__.py b/lib/py_edge_interface/carlos/edge/interface/__init__.py index f840e2e0..8dd47a90 100644 --- a/lib/py_edge_interface/carlos/edge/interface/__init__.py +++ b/lib/py_edge_interface/carlos/edge/interface/__init__.py @@ -1,5 +1,6 @@ __all__ = [ "CarlosMessage", + "DeviceConfigPayload", "DeviceId", "EdgeCommunicationHandler", "EdgeConnectionDisconnected", @@ -8,6 +9,8 @@ "EdgeVersionPayload", "MessageHandler", "MessageType", + "PING", + "PONG", "PingMessage", "PongMessage", "get_websocket_endpoint", @@ -17,12 +20,15 @@ from .endpoint import get_websocket_endpoint, get_websocket_token_endpoint from .messages import ( CarlosMessage, + DeviceConfigPayload, EdgeVersionPayload, MessageType, PingMessage, PongMessage, ) from .protocol import ( + PING, + PONG, EdgeCommunicationHandler, EdgeConnectionDisconnected, EdgeConnectionFailed, diff --git a/lib/py_edge_interface/carlos/edge/interface/device/driver.py b/lib/py_edge_interface/carlos/edge/interface/device/driver.py index 6757741c..35b7c0aa 100644 --- a/lib/py_edge_interface/carlos/edge/interface/device/driver.py +++ b/lib/py_edge_interface/carlos/edge/interface/device/driver.py @@ -20,12 +20,14 @@ from .driver_config import ( DirectionMixin, DriverConfig, + DriverConfigWithDirection, DriverDirection, + DriverSignal, GpioDriverConfig, I2cDriverConfig, ) -DriverConfigTypeVar = TypeVar("DriverConfigTypeVar", bound=DriverConfig) +DriverConfigTypeVar = TypeVar("DriverConfigTypeVar", bound=DriverConfigWithDirection) DRIVER_THREAD_POOL = concurrent.futures.ThreadPoolExecutor( @@ -46,6 +48,15 @@ def __str__(self): def identifier(self): return self.config.identifier + @property + def direction(self) -> DriverDirection: + return self.config.direction + + @abstractmethod + def signals(self) -> list[DriverSignal]: + """Returns the signals of the peripheral.""" + pass + @abstractmethod def setup(self) -> Self: """Sets up the peripheral. This is required for testing. As the test runner diff --git a/lib/py_edge_interface/carlos/edge/interface/device/driver_config.py b/lib/py_edge_interface/carlos/edge/interface/device/driver_config.py index d568fe95..7fa9ee97 100644 --- a/lib/py_edge_interface/carlos/edge/interface/device/driver_config.py +++ b/lib/py_edge_interface/carlos/edge/interface/device/driver_config.py @@ -1,17 +1,23 @@ __all__ = [ + "DRIVER_IDENTIFIER_LENGTH", "DirectionMixin", "DriverConfig", + "DriverConfigWithDirection", "DriverDirection", + "DriverMetadata", + "DriverSignal", "GpioDriverConfig", "I2cDriverConfig", - "DRIVER_IDENTIFIER_LENGTH", ] import importlib from enum import StrEnum from typing import Literal -from pydantic import BaseModel, Field, field_validator +from pydantic import BaseModel, Field, computed_field, field_validator + +from carlos.edge.interface.types import CarlosSchema +from carlos.edge.interface.units import PhysicalQuantity, UnitOfMeasurement # Pin layout # 5V, 5V, GND, 14, 15, 18, GND, 23, 24, GND, 25, 8, 7, ID EEPROM, GND, 12, GND, 16, 20, 21 # Outer pins # noqa @@ -20,7 +26,7 @@ DRIVER_IDENTIFIER_LENGTH = 64 -class DriverConfig(BaseModel): +class _DriverConfigMixin(CarlosSchema): """Common base class for all driver_module configurations.""" identifier: str = Field( @@ -39,6 +45,10 @@ class DriverConfig(BaseModel): "itself.", ) + +class DriverConfig(_DriverConfigMixin): + """Common base class for all driver_module configurations.""" + @field_validator("driver_module", mode="after") def _validate_driver_module(cls, value): """Converts a module name to a full module path.""" @@ -69,7 +79,11 @@ class DirectionMixin(BaseModel): direction: DriverDirection = Field(..., description="The direction of the IO.") -class GpioDriverConfig(DriverConfig, DirectionMixin): +class DriverConfigWithDirection(DriverConfig, DirectionMixin): + pass + + +class GpioDriverConfig(DriverConfigWithDirection): """Defines a single input configuration.""" protocol: Literal["gpio"] = Field( @@ -107,7 +121,7 @@ class GpioDriverConfig(DriverConfig, DirectionMixin): ] = Field(..., description="The GPIO pin number.") -class I2cDriverConfig(DriverConfig, DirectionMixin): +class I2cDriverConfig(DriverConfigWithDirection): """Defines a single input configuration.""" protocol: Literal["i2c"] = Field( @@ -141,3 +155,32 @@ def validate_address(cls, value): def address_int(self): """Returns the I2C address as an integer.""" return int(self.address, 16) + + +class DriverSignal(CarlosSchema): + """Defines the signals that the driver provides.""" + + signal_identifier: str = Field( + ..., + description="A unique identifier of the signal within the context of a driver.", + ) + + unit_of_measurement: UnitOfMeasurement = Field( + ..., + description="The unit of measurement of the signal.", + ) + + @computed_field # type: ignore + @property + def physical_quantity(self) -> PhysicalQuantity: # pragma: no cover + """Returns the physical quantity of this signal.""" + return self.unit_of_measurement.physical_quantity + + +class DriverMetadata(_DriverConfigMixin, DirectionMixin): + """Provides the metadata for the driver.""" + + signals: list[DriverSignal] = Field( + ..., + description="The signals that the driver provides.", + ) diff --git a/lib/py_edge_interface/carlos/edge/interface/device/driver_test.py b/lib/py_edge_interface/carlos/edge/interface/device/driver_test.py index 9bd59997..0839b024 100644 --- a/lib/py_edge_interface/carlos/edge/interface/device/driver_test.py +++ b/lib/py_edge_interface/carlos/edge/interface/device/driver_test.py @@ -5,6 +5,7 @@ import pytest from pydantic import BaseModel +from ..units import UnitOfMeasurement from .driver import ( AnalogInput, CarlosDriver, @@ -12,7 +13,12 @@ DriverFactory, validate_device_address_space, ) -from .driver_config import GpioDriverConfig, I2cDriverConfig +from .driver_config import ( + DriverDirection, + DriverSignal, + GpioDriverConfig, + I2cDriverConfig, +) DRIVER_MODULE = __name__ @@ -33,6 +39,14 @@ class AnalogInputTest(AnalogInput): + def signals(self) -> list[DriverSignal]: + return [ + DriverSignal( + signal_identifier=key, unit_of_measurement=UnitOfMeasurement.UNIT_LESS + ) + for key in ANALOG_INPUT_VALUE.keys() + ] + def setup(self): pass @@ -56,6 +70,8 @@ def test_analog_input(): """This test tests the AnalogInput Interface bia the AnalogInputTest class.""" analog_input = AnalogInputTest(config=ANALOG_INPUT_CONFIG) + assert analog_input.direction == DriverDirection.INPUT, "Direction should be INPUT." + assert ( analog_input.test() == ANALOG_INPUT_VALUE ), "Test function should return a reading." @@ -77,12 +93,20 @@ async def test_async_analog_input(): class DigitalOutputTest(DigitalOutput): + def signals(self) -> list[DriverSignal]: + return [ + DriverSignal( + signal_identifier="state", + unit_of_measurement=UnitOfMeasurement.UNIT_LESS, + ) + ] + def setup(self) -> Self: self.pytest_state = None return self - def read(self) -> bool: - return self.pytest_state + def read(self) -> dict[str, float]: + return {"state": self.pytest_state} def set(self, value: bool): self.pytest_state = value @@ -92,12 +116,16 @@ def test_digital_output(): """This test tests the DigitalOutput Interface via the DigitalOutputTest class.""" digital_output = DigitalOutputTest(config=DIGITAL_OUTPUT_CONFIG).setup() - assert digital_output.read() is None, "Initial state should be None." + assert ( + digital_output.direction == DriverDirection.OUTPUT + ), "Direction should be OUTPUT." + + assert digital_output.read()["state"] is None, "Initial state should be None." digital_output.test() assert ( - digital_output.read() is not None + digital_output.read()["state"] is not None ), "State should be set to a value after running the test." # using input config for output should raise an error diff --git a/lib/py_edge_interface/carlos/edge/interface/messages.py b/lib/py_edge_interface/carlos/edge/interface/messages.py index 491d53f8..00293123 100644 --- a/lib/py_edge_interface/carlos/edge/interface/messages.py +++ b/lib/py_edge_interface/carlos/edge/interface/messages.py @@ -4,9 +4,10 @@ __all__ = [ "CarlosMessage", "CarlosPayload", + "DeviceConfigPayload", "DriverDataPayload", - "EdgeVersionPayload", "DriverTimeseries", + "EdgeVersionPayload", "MessageType", "PingMessage", "PongMessage", @@ -19,6 +20,7 @@ from pydantic import Field, ValidationError, model_validator +from carlos.edge.interface.device.driver_config import DriverMetadata from carlos.edge.interface.types import CarlosSchema @@ -37,6 +39,12 @@ class MessageType(str, Enum): """Requests the communication partner to respond with the version of the Carlos Edge device. This is used to determine if a Carlos Edge device requires an update.""" + DEVICE_CONFIG = "device_config" + """Send by the client after successfully connecting to the server. + This message contains the device config. It is used by the server to create the + required data structures to store the devices data and to know which data can be + received from and by the device.""" + DRIVER_DATA = "driver_data" """Send by the device. This message contains a collection of samples measured by the device.""" @@ -218,10 +226,23 @@ class DriverDataAckPayload(CarlosPayloadBase): ) +class DeviceConfigPayload(CarlosPayloadBase): + """Send by the client after successfully connecting to the server. + This message contains the device config. It is used by the server to create the + required data structures to store the devices data and to know which data can be + received from and by the device.""" + + drivers: list[DriverMetadata] = Field( + ..., + description="The driver configuration for the device.", + ) + + MESSAGE_TYPE_TO_MODEL: dict[MessageType, type[CarlosPayloadBase] | None] = { MessageType.PING: PingMessage, MessageType.PONG: PongMessage, MessageType.EDGE_VERSION: EdgeVersionPayload, + MessageType.DEVICE_CONFIG: DeviceConfigPayload, MessageType.DRIVER_DATA: DriverDataPayload, MessageType.DRIVER_DATA_ACK: DriverDataAckPayload, } diff --git a/lib/py_edge_interface/carlos/edge/interface/plugin_pytest.py b/lib/py_edge_interface/carlos/edge/interface/plugin_pytest.py index f8f0add9..8d24db1a 100644 --- a/lib/py_edge_interface/carlos/edge/interface/plugin_pytest.py +++ b/lib/py_edge_interface/carlos/edge/interface/plugin_pytest.py @@ -5,7 +5,10 @@ import pytest from carlos.edge.interface import CarlosMessage, EdgeProtocol -from carlos.edge.interface.protocol import EdgeConnectionDisconnected +from carlos.edge.interface.protocol import ( + EdgeConnectionDisconnected, + EdgeProtocolCallback, +) class EdgeProtocolTestingConnection(EdgeProtocol): @@ -13,7 +16,14 @@ class EdgeProtocolTestingConnection(EdgeProtocol): utilizing an async FIFO queue to simulate the communication between the server and the client. This is useful for testing purposes.""" - def __init__(self, send_queue: Queue[str], receive_queue: Queue[str]): + def __init__( + self, + send_queue: Queue[str], + receive_queue: Queue[str], + on_connect: EdgeProtocolCallback | None = None, + ): + + super().__init__(on_connect=on_connect) self._send_queue = send_queue self._receive_queue = receive_queue @@ -37,6 +47,9 @@ def connect(self): :raises EdgeConnectionFailed: If the connection attempt fails.""" self._is_connected = True + if self.on_connect: + self.on_connect(self) # pragma: no cover + async def send(self, message: CarlosMessage) -> None: """Send data to the other end of the connection. diff --git a/lib/py_edge_interface/carlos/edge/interface/protocol.py b/lib/py_edge_interface/carlos/edge/interface/protocol.py index 4a42147d..31edcd7b 100644 --- a/lib/py_edge_interface/carlos/edge/interface/protocol.py +++ b/lib/py_edge_interface/carlos/edge/interface/protocol.py @@ -6,6 +6,7 @@ "EdgeConnectionDisconnected", "EdgeConnectionFailed", "EdgeProtocol", + "EdgeProtocolCallback", "MessageHandler", "PING", "PONG", @@ -14,7 +15,7 @@ import inspect from abc import ABC, abstractmethod from asyncio import sleep -from typing import Protocol, runtime_checkable +from typing import Awaitable, Callable, Protocol, runtime_checkable from loguru import logger @@ -30,10 +31,16 @@ class EdgeConnectionFailed(Exception): """Raised when the connection attempt fails.""" +EdgeProtocolCallback = Callable[["EdgeProtocol"], Awaitable[None]] + + class EdgeProtocol(ABC): """An abstract protocol that defines the necessary operations to be implemented by the server and client.""" + def __init__(self, on_connect: EdgeProtocolCallback | None = None): + self.on_connect = on_connect + @abstractmethod async def send(self, message: CarlosMessage) -> None: """Send data to the other end of the connection. @@ -128,8 +135,8 @@ def register_handlers(self, handlers: dict[MessageType, MessageHandler]): if handler_params != expected_function_params: raise TypeError( - f"Handler {handler} for message type {message_type} does not have the " - f"correct signature." + f"Handler {handler} for message type {message_type} " + f"does not have the correct signature." ) self._handlers.update(handlers) diff --git a/lib/py_edge_interface/carlos/edge/interface/units.py b/lib/py_edge_interface/carlos/edge/interface/units.py new file mode 100644 index 00000000..ab608566 --- /dev/null +++ b/lib/py_edge_interface/carlos/edge/interface/units.py @@ -0,0 +1,51 @@ +__all__ = [ + "PhysicalQuantity", + "UnitOfMeasurement", +] +from enum import IntEnum + +from carlos.edge.interface.utils import add_enum_members_to_docstr + + +class PhysicalQuantity(IntEnum): + """An enumeration of supported physical quantities""" + + IDENTITY = 0 + TEMPERATURE = 1 + HUMIDITY = 2 + ILLUMINANCE = 3 + RATIO = 4 + + +add_enum_members_to_docstr(PhysicalQuantity) + + +class UnitOfMeasurement(IntEnum): + """An enumeration of supported units of measurement. + + The values of this enumeration are based on the PhysicalQuantity enumeration. + """ + + # 0 - 99: IDENTITY + UNIT_LESS = 0 + + # 100 - 199: RATIO + PERCENTAGE = 100 + + # 200 - 299: TEMPERATURE + CELSIUS = 200 + FAHRENHEIT = 201 + + # 300 - 399: HUMIDITY + HUMIDITY_PERCENTAGE = 300 + + # 400 - 499: ILLUMINANCE + LUX = 400 + + @property + def physical_quantity(self) -> PhysicalQuantity: + """Returns the physical quantity of this unit of measurement.""" + return PhysicalQuantity(self.value // 100) + + +add_enum_members_to_docstr(UnitOfMeasurement) diff --git a/lib/py_edge_interface/carlos/edge/interface/units_test.py b/lib/py_edge_interface/carlos/edge/interface/units_test.py new file mode 100644 index 00000000..648dece1 --- /dev/null +++ b/lib/py_edge_interface/carlos/edge/interface/units_test.py @@ -0,0 +1,12 @@ +import pytest + +from .units import PhysicalQuantity, UnitOfMeasurement + + +@pytest.mark.parametrize("unit", list(UnitOfMeasurement)) +def test_unit_of_measurement(unit): + """This test ensures that each unit of measurement can be converted into + a physical quantity. This test does not ensure that the conversion is + correct. This can't be done automatically. Since this is already quite declarative, + we don't need to test the conversion here.""" + assert isinstance(unit.physical_quantity, PhysicalQuantity) diff --git a/lib/py_edge_interface/carlos/edge/interface/utils.py b/lib/py_edge_interface/carlos/edge/interface/utils.py new file mode 100644 index 00000000..d9972337 --- /dev/null +++ b/lib/py_edge_interface/carlos/edge/interface/utils.py @@ -0,0 +1,20 @@ +from enum import Enum + + +def add_enum_members_to_docstr(enum_class: type[Enum]): + """Adds the docstrings of the enum values to the enum class. + + This is useful to have the docstrings of the enum values in the OpenAPI + documentation. + """ + + indentation = " " * 4 + + # the 4 white spaces are needed to have a consistent indentation in the docs + member_doc_prefix = f"\n{indentation}- " + enum_class.__doc__ = ( + enum_class.__doc__ or f"Possible values of {enum_class.__name__}" + ) + "\n" + enum_class.__doc__ += member_doc_prefix + member_doc_prefix.join( + f"{u.value} = {u.name}" for u in enum_class + ) diff --git a/lib/py_edge_interface/pyproject.toml b/lib/py_edge_interface/pyproject.toml index fd16564c..38ad328a 100644 --- a/lib/py_edge_interface/pyproject.toml +++ b/lib/py_edge_interface/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "carlos.edge.interface" -version = "0.1.4" +version = "0.1.5" description = "Shared library to handle the edge communication." authors = ["Felix Fanghanel"] license = "MIT" @@ -23,7 +23,7 @@ requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" [tool.bumpversion] -current_version = "0.1.4" +current_version = "0.1.5" commit = true tag = false parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)(\\-(?P[a-z0-9\\.]+))?" diff --git a/lib/py_edge_server/carlos/edge/server/connection.py b/lib/py_edge_server/carlos/edge/server/connection.py index 8d8366ed..a6b3e07b 100644 --- a/lib/py_edge_server/carlos/edge/server/connection.py +++ b/lib/py_edge_server/carlos/edge/server/connection.py @@ -21,12 +21,12 @@ def connected_devices(self) -> list[DeviceId]: return list(self._active_connections.keys()) - async def add_device(self, device_id: DeviceId, protocol: EdgeProtocol): + async def add_device(self, protocol: EdgeProtocol, device_id: DeviceId): """Adds the given protocol to the active connections and sends the handshake messages. - :param device_id: The unique identifier of the device. :param protocol: The corresponding protocol of the device. + :param device_id: The unique identifier of the device. """ # if a device with the same id is already connected, disconnect it diff --git a/lib/py_edge_server/pyproject.toml b/lib/py_edge_server/pyproject.toml index fedf4791..0a628ee9 100644 --- a/lib/py_edge_server/pyproject.toml +++ b/lib/py_edge_server/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "carlos.edge.server" -version = "0.1.1" +version = "0.1.2" description = "The library for the edge server of the carlos project." authors = ["Felix Fanghanel"] license = "MIT" @@ -27,7 +27,7 @@ requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" [tool.bumpversion] -current_version = "0.1.1" +current_version = "0.1.2" commit = true tag = false parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)(\\-(?P[a-z0-9\\.]+))?" diff --git a/services/api/carlos/api/routes/device_server_routes/device_server_routes.py b/services/api/carlos/api/routes/device_server_routes/device_server_routes.py index f522e869..8ec72d84 100644 --- a/services/api/carlos/api/routes/device_server_routes/device_server_routes.py +++ b/services/api/carlos/api/routes/device_server_routes/device_server_routes.py @@ -4,6 +4,7 @@ __all__ = ["device_server_router"] import warnings +from functools import partial from carlos.database.context import RequestContext from carlos.database.device import ensure_device_exists, set_device_seen @@ -96,9 +97,11 @@ async def device_server_websocket( await set_device_seen(context=context, device_id=device_id) - protocol = WebsocketProtocol(websocket) + protocol = WebsocketProtocol( + websocket, + on_connect=partial(DEVICE_CONNECTION_MANAGER.add_device, device_id=device_id), + ) await protocol.connect() # accepts the connection - await DEVICE_CONNECTION_MANAGER.add_device(device_id=device_id, protocol=protocol) try: await ServerEdgeCommunicationHandler( diff --git a/services/api/carlos/api/routes/device_server_routes/protocol.py b/services/api/carlos/api/routes/device_server_routes/protocol.py index deb45927..dac5ba40 100644 --- a/services/api/carlos/api/routes/device_server_routes/protocol.py +++ b/services/api/carlos/api/routes/device_server_routes/protocol.py @@ -5,13 +5,19 @@ EdgeConnectionDisconnected, EdgeProtocol, ) +from carlos.edge.interface.protocol import EdgeProtocolCallback from starlette.websockets import WebSocket, WebSocketDisconnect, WebSocketState class WebsocketProtocol(EdgeProtocol): """A websocket implementation of the EdgeProtocol.""" - def __init__(self, websocket: WebSocket): + def __init__( + self, websocket: WebSocket, on_connect: EdgeProtocolCallback | None = None + ): + + super().__init__(on_connect=on_connect) + self._websocket = websocket async def send(self, message: CarlosMessage) -> None: @@ -46,6 +52,9 @@ async def connect(self): :raises EdgeConnectionFailed: If the connection attempt fails.""" await self._websocket.accept() + if self.on_connect: + await self.on_connect(self) + async def disconnect(self): """Called when the connection is disconnected.""" await self._websocket.close() # pragma: no cover diff --git a/services/api/poetry.lock b/services/api/poetry.lock index dc36da2f..94e17d72 100644 --- a/services/api/poetry.lock +++ b/services/api/poetry.lock @@ -275,7 +275,7 @@ url = "../../lib/py_carlos_database" [[package]] name = "carlos-edge-device" -version = "0.1.9" +version = "0.1.10" description = "The library for the edge device of the carlos project." optional = false python-versions = ">=3.11,<3.12" @@ -302,7 +302,7 @@ url = "../../lib/py_edge_device" [[package]] name = "carlos-edge-interface" -version = "0.1.4" +version = "0.1.5" description = "Shared library to handle the edge communication." optional = false python-versions = ">=3.11,<3.12" diff --git a/services/device/device/websocket.py b/services/device/device/websocket.py index 01c13ed8..912ab953 100644 --- a/services/device/device/websocket.py +++ b/services/device/device/websocket.py @@ -15,7 +15,10 @@ ) from carlos.edge.device.storage.connection import get_storage_engine from carlos.edge.interface import CarlosMessage, EdgeProtocol -from carlos.edge.interface.protocol import EdgeConnectionDisconnected +from carlos.edge.interface.protocol import ( + EdgeConnectionDisconnected, + EdgeProtocolCallback, +) from httpx import AsyncClient from loguru import logger @@ -25,11 +28,18 @@ # can only be tested in integration tests class DeviceWebsocketClient(EdgeProtocol): # pragma: no cover - def __init__(self, settings: ConnectionSettings): + def __init__( + self, + settings: ConnectionSettings, + on_connect: EdgeProtocolCallback | None = None, + ): """Initializes the websocket client. :param settings: The settings of the websocket connection. """ + + super().__init__(on_connect=on_connect) + self._settings = settings self._connection: websockets.WebSocketClientProtocol | None = None @@ -54,6 +64,9 @@ async def connect(self): func=self._do_connect, expected_exceptions=(Exception,) ) + if self.on_connect: + await self.on_connect(self) + logger.info(f"Connected to the server: {self._settings.server_url}") async def _do_connect(self) -> websockets.WebSocketClientProtocol: diff --git a/services/device/poetry.lock b/services/device/poetry.lock index add9898f..0fdfc9c3 100644 --- a/services/device/poetry.lock +++ b/services/device/poetry.lock @@ -185,7 +185,7 @@ test = ["coverage", "freezegun", "pre-commit", "pytest", "pytest-cov", "pytest-m [[package]] name = "carlos-edge-device" -version = "0.1.9" +version = "0.1.10" description = "The library for the edge device of the carlos project." optional = false python-versions = ">=3.11,<3.12" @@ -212,7 +212,7 @@ url = "../../lib/py_edge_device" [[package]] name = "carlos-edge-interface" -version = "0.1.4" +version = "0.1.5" description = "Shared library to handle the edge communication." optional = false python-versions = ">=3.11,<3.12"