Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: send device metadata #45

Merged
merged 11 commits into from
Apr 28, 2024
9 changes: 7 additions & 2 deletions lib/py_edge_device/carlos/edge/device/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from typing import TypeVar

import yaml
from carlos.edge.interface.device import CarlosDriver, DriverConfig, DriverFactory
from carlos.edge.interface.device import CarlosDriver, DriverFactory
from carlos.edge.interface.device.driver_config import (
DriverConfigWithDirection,
DriverDirection,
)
from loguru import logger
from pydantic import BaseModel

Expand Down Expand Up @@ -65,9 +69,10 @@ def load_drivers(config_dir: Path | None = None) -> list[CarlosDriver]:
driver_configs.insert(
0,
factory.build(
DriverConfig(
DriverConfigWithDirection(
identifier="__device_metrics__",
driver_module=DeviceMetrics.__module__,
direction=DriverDirection.INPUT,
).model_dump()
),
)
Expand Down
23 changes: 21 additions & 2 deletions lib/py_edge_device/carlos/edge/device/driver/_dhtxx.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from time import sleep

from carlos.edge.interface.device import AnalogInput, DriverDirection, GpioDriverConfig
from carlos.edge.interface.device.driver_config import DriverSignal
from carlos.edge.interface.units import UnitOfMeasurement
from pydantic import Field

from carlos.edge.device.protocol import GPIO
Expand Down Expand Up @@ -124,13 +126,30 @@ def read(self) -> tuple[float, float]:
class DHTXX(AnalogInput, ABC):
"""DHTXX Temperature and Humidity Sensor."""

TEMP_SIGNAL_ID = "temperature"
HUMIDITY_SIGNAL_ID = "humidity"

def __init__(self, config: GpioDriverConfig):

super().__init__(config=config)

self._dht: DHT | None = None
self._dht_type: DHTType | None = None

def signals(self) -> list[DriverSignal]:
"""Returns the signals of the DHT sensor."""

return [
DriverSignal(
signal_identifier=self.TEMP_SIGNAL_ID,
unit_of_measurement=UnitOfMeasurement.CELSIUS,
),
DriverSignal(
signal_identifier=self.HUMIDITY_SIGNAL_ID,
unit_of_measurement=UnitOfMeasurement.HUMIDITY_PERCENTAGE,
),
]

def setup(self):
"""Sets up the DHT11 sensor."""

Expand All @@ -148,8 +167,8 @@ def read(self) -> dict[str, float]:
try:
temperature, humidity = self._dht.read()
return {
"temperature": temperature,
"humidity": humidity,
self.TEMP_SIGNAL_ID: temperature,
self.HUMIDITY_SIGNAL_ID: humidity,
}
except RuntimeError as ex:
last_error = ex
Expand Down
46 changes: 39 additions & 7 deletions lib/py_edge_device/carlos/edge/device/driver/device_metrics.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,57 @@
import psutil
from carlos.edge.interface.device import AnalogInput, DriverConfig, DriverFactory
from carlos.edge.interface.device import AnalogInput, DriverFactory
from carlos.edge.interface.device.driver_config import (
DriverConfigWithDirection,
DriverSignal,
)
from carlos.edge.interface.units import UnitOfMeasurement


class DeviceMetrics(AnalogInput):
"""Provides the metrics of the device."""

def __init__(self, config: DriverConfig):
_CPU_LOAD_SIGNAL_ID = "cpu.load_percent"
_CPU_TEMP_SIGNAL_ID = "cpu.temperature"
_MEMORY_USAGE_SIGNAL_ID = "memory.usage_percent"
_DISK_USAGE_SIGNAL_ID = "disk.usage_percent"

def __init__(self, config: DriverConfigWithDirection):

super().__init__(config=config)

def signals(self) -> list[DriverSignal]:
"""Returns the signals of the DHT sensor."""

return [
DriverSignal(
signal_identifier=self._CPU_LOAD_SIGNAL_ID,
unit_of_measurement=UnitOfMeasurement.PERCENTAGE,
),
DriverSignal(
signal_identifier=self._CPU_TEMP_SIGNAL_ID,
unit_of_measurement=UnitOfMeasurement.CELSIUS,
),
DriverSignal(
signal_identifier=self._MEMORY_USAGE_SIGNAL_ID,
unit_of_measurement=UnitOfMeasurement.PERCENTAGE,
),
DriverSignal(
signal_identifier=self._DISK_USAGE_SIGNAL_ID,
unit_of_measurement=UnitOfMeasurement.PERCENTAGE,
),
]

def setup(self):
pass

def read(self) -> dict[str, float]:
"""Reads the device metrics."""

return {
"cpu.load_percent": psutil.cpu_percent(interval=1.0),
"cpu.temperature": self._read_cpu_temp(),
"memory.usage_percent": psutil.virtual_memory().percent,
"disk.usage_percent": psutil.disk_usage("/").percent,
self._CPU_LOAD_SIGNAL_ID: psutil.cpu_percent(interval=1.0),
self._CPU_TEMP_SIGNAL_ID: self._read_cpu_temp(),
self._MEMORY_USAGE_SIGNAL_ID: psutil.virtual_memory().percent,
self._DISK_USAGE_SIGNAL_ID: psutil.disk_usage("/").percent,
}

@staticmethod
Expand All @@ -33,5 +65,5 @@ def _read_cpu_temp() -> float:


DriverFactory().register(
driver_module=__name__, config=DriverConfig, factory=DeviceMetrics
driver_module=__name__, config=DriverConfigWithDirection, factory=DeviceMetrics
)
22 changes: 18 additions & 4 deletions lib/py_edge_device/carlos/edge/device/driver/relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
DriverFactory,
GpioDriverConfig,
)
from carlos.edge.interface.device.driver_config import DriverSignal
from carlos.edge.interface.units import UnitOfMeasurement
from pydantic import Field

from carlos.edge.device.protocol import GPIO
Expand All @@ -20,11 +22,23 @@ class RelayConfig(GpioDriverConfig):
class Relay(DigitalOutput, DigitalInput):
"""Relay."""

_STATE_SIGNAL_ID = "state"

def __init__(self, config: RelayConfig):
super().__init__(config=config)

self._state = False

def signals(self) -> list[DriverSignal]:
"""Returns the signals of the DHT sensor."""

return [
DriverSignal(
signal_identifier=self._STATE_SIGNAL_ID,
unit_of_measurement=UnitOfMeasurement.UNIT_LESS,
),
]

def setup(self):

# HIGH means off, LOW means
Expand All @@ -42,27 +56,27 @@ def set(self, value: bool):
def read(self) -> dict[str, bool]:
"""Reads the value of the relay."""

return {"state": self._state}
return {self._STATE_SIGNAL_ID: self._state}

def test(self):
"""Tests the relay by reading the value."""

self.set(False)
time.sleep(0.01)
state = self.read()
if state["state"]:
if state[self._STATE_SIGNAL_ID]:
raise ValueError(f"Value of relay was not set to false. Got: {state}")

self.set(True)
time.sleep(1)
state = self.read()
if not state["state"]:
if not state[self._STATE_SIGNAL_ID]:
raise ValueError(f"Value of relay was not set to true. Got: {state}")

self.set(False)
time.sleep(0.01)
state = self.read()
if state["state"]:
if state[self._STATE_SIGNAL_ID]:
raise ValueError(f"Value of relay was not set to false. Got: {state}")


Expand Down
23 changes: 21 additions & 2 deletions lib/py_edge_device/carlos/edge/device/driver/sht30.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
DriverFactory,
I2cDriverConfig,
)
from carlos.edge.interface.device.driver_config import DriverSignal
from carlos.edge.interface.units import UnitOfMeasurement
from pydantic import Field

from carlos.edge.device.protocol import I2C
Expand Down Expand Up @@ -37,6 +39,9 @@ class SHT30(AnalogInput):
PARAM_HIGH_REPEATABLITY = 0x06
"""Marks the measurement as high repeatability."""

_TEMPERATURE_SIGNAL_ID = "temperature"
_HUMIDITY_SIGNAL_ID = "humidity"

def __init__(self, config: SHT30Config):
if config.address_int not in SHT30.I2C_ADDRESSES:
raise ValueError(
Expand All @@ -48,6 +53,20 @@ def __init__(self, config: SHT30Config):

self._i2c: I2C | None = None

def signals(self) -> list[DriverSignal]:
"""Returns the signals of the DHT sensor."""

return [
DriverSignal(
signal_identifier=self._TEMPERATURE_SIGNAL_ID,
unit_of_measurement=UnitOfMeasurement.CELSIUS,
),
DriverSignal(
signal_identifier=self._HUMIDITY_SIGNAL_ID,
unit_of_measurement=UnitOfMeasurement.HUMIDITY_PERCENTAGE,
),
]

def setup(self):
self._i2c = I2C(address=self.config.address_int)

Expand All @@ -57,8 +76,8 @@ def read(self) -> dict[str, float]:
read_delay_ms = 100
humidity, temperature = self._get_measurement(read_delay_ms=read_delay_ms)
return {
"temperature": float(temperature),
"humidity": float(humidity),
self._TEMPERATURE_SIGNAL_ID: float(temperature),
self._HUMIDITY_SIGNAL_ID: float(humidity),
}

def _get_measurement(self, read_delay_ms: int = 100) -> tuple[float, float]:
Expand Down
44 changes: 39 additions & 5 deletions lib/py_edge_device/carlos/edge/device/driver/si1145.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
DriverFactory,
I2cDriverConfig,
)
from carlos.edge.interface.device.driver_config import DriverSignal
from carlos.edge.interface.units import UnitOfMeasurement
from pydantic import Field

from carlos.edge.device.protocol import I2C
Expand All @@ -21,6 +23,12 @@ class Si1145Config(I2cDriverConfig):

class SI1145(AnalogInput):

_VISUAL_LIGHT_SIGNAL_ID = "visual-light"
_VISUAL_LIGHT_RAW_SIGNAL_ID = "visual-light-raw"
_INFRARED_LIGHT_SIGNAL_ID = "infrared-light"
_INFRARED_LIGHT_RAW_SIGNAL_ID = "infrared-light-raw"
_UV_INDEX_SIGNAL_ID = "uv-index"

def __init__(self, config: Si1145Config):

if config.address_int != SDL_Pi_SI1145.ADDR:
Expand All @@ -32,6 +40,32 @@ def __init__(self, config: Si1145Config):

self._si1145: SDL_Pi_SI1145 | None = None

def signals(self) -> list[DriverSignal]:
"""Returns the signals of the DHT sensor."""

return [
DriverSignal(
signal_identifier=self._VISUAL_LIGHT_SIGNAL_ID,
unit_of_measurement=UnitOfMeasurement.LUX,
),
DriverSignal(
signal_identifier=self._VISUAL_LIGHT_RAW_SIGNAL_ID,
unit_of_measurement=UnitOfMeasurement.UNIT_LESS,
),
DriverSignal(
signal_identifier=self._INFRARED_LIGHT_SIGNAL_ID,
unit_of_measurement=UnitOfMeasurement.LUX,
),
DriverSignal(
signal_identifier=self._INFRARED_LIGHT_RAW_SIGNAL_ID,
unit_of_measurement=UnitOfMeasurement.UNIT_LESS,
),
DriverSignal(
signal_identifier=self._UV_INDEX_SIGNAL_ID,
unit_of_measurement=UnitOfMeasurement.UNIT_LESS,
),
]

def setup(self):

self._si1145 = SDL_Pi_SI1145()
Expand All @@ -48,11 +82,11 @@ def read(self) -> dict[str, float]:
uv_idx = self._si1145.read_uv_index()

return {
"visual-light-raw": float(vis_raw),
"visual-light": float(vis_lux),
"infrared-light-raw": float(ir_raw),
"infrared-light": float(ir_lux),
"uv-index": float(uv_idx),
self._VISUAL_LIGHT_RAW_SIGNAL_ID: float(vis_raw),
self._VISUAL_LIGHT_SIGNAL_ID: float(vis_lux),
self._INFRARED_LIGHT_RAW_SIGNAL_ID: float(ir_raw),
self._INFRARED_LIGHT_SIGNAL_ID: float(ir_lux),
self._UV_INDEX_SIGNAL_ID: float(uv_idx),
}


Expand Down
Loading
Loading