Skip to content

Commit

Permalink
feat: Device peripherals (#32)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: flxdot <flxdot@users.noreply.github.com>
  • Loading branch information
3 people authored Apr 19, 2024
1 parent ecaf844 commit 038d421
Show file tree
Hide file tree
Showing 30 changed files with 2,064 additions and 48 deletions.
13 changes: 13 additions & 0 deletions lib/py_carlos_database/.idea/py_carlos_database.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions lib/py_edge_device/carlos/edge/device/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
__all__ = ["DeviceRuntime", "read_config", "DeviceConfig"]
__all__ = [
"DeviceRuntime",
]

from .config import DeviceConfig, read_config
from .runtime import DeviceRuntime
56 changes: 36 additions & 20 deletions lib/py_edge_device/carlos/edge/device/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,21 @@
configuration of the application."""

__all__ = [
"DeviceConfig",
"read_config",
"load_drivers",
"read_config_file",
"write_config",
"write_config_file",
]

from pathlib import Path
from typing import TypeVar

import yaml
from carlos.edge.interface import DeviceId
from pydantic import BaseModel, Field
from carlos.edge.interface.device import CarlosDriver, DriverConfig, DriverFactory
from loguru import logger
from pydantic import BaseModel

from carlos.edge.device.constants import CONFIG_FILE_NAME


class DeviceConfig(BaseModel):
"""Configures the pure device settings."""

device_id: DeviceId = Field(..., description="The unique identifier of the device.")

from carlos.edge.device.driver.device_metrics import DeviceMetrics

Config = TypeVar("Config", bound=BaseModel)

Expand All @@ -50,15 +43,38 @@ def write_config_file(path: Path, config: Config):
)


def read_config() -> DeviceConfig: # pragma: no cover
def load_drivers(config_dir: Path | None = None) -> list[CarlosDriver]:
"""Reads the configuration from the default location."""
config_dir = config_dir or Path.cwd()

return read_config_file(
path=Path.cwd() / CONFIG_FILE_NAME,
schema=DeviceConfig,
)
with open(config_dir / CONFIG_FILE_NAME, "r") as file:
raw_config = yaml.safe_load(file)

factory = DriverFactory()

try:
driver_configs = [factory.build(config) for config in raw_config["drivers"]]
except KeyError: # pragma: no cover
raise KeyError(
f"The configuration file {config_dir / CONFIG_FILE_NAME} must contain "
f"a 'drivers' key."
)

# We always want to have some device metrics
if not any(isinstance(driver, DeviceMetrics) for driver in driver_configs):
driver_configs.insert(
0,
factory.build(
DriverConfig(
identifier="__device_metrics__",
driver_module=DeviceMetrics.__module__,
).model_dump()
),
)

logger.info(
f"Loaded {len(driver_configs)} IOs: "
f"{', '.join(str(io) for io in driver_configs)}"
)

def write_config(config: DeviceConfig): # pragma: no cover
"""Writes the configuration to the default location."""
write_config_file(path=Path.cwd() / CONFIG_FILE_NAME, config=config)
return driver_configs
28 changes: 23 additions & 5 deletions lib/py_edge_device/carlos/edge/device/config_test.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from pathlib import Path
from uuid import uuid4

import pytest
from carlos.edge.interface.device import AnalogInput, DigitalOutput, GpioDriverConfig

from carlos.edge.device.config import DeviceConfig, read_config_file, write_config_file
from carlos.edge.device.config import load_drivers, read_config_file, write_config_file
from tests.test_data import EXPECTED_IO_COUNT, TEST_DEVICE_WORKDIR


def test_config_file_io(tmp_path: Path):
Expand All @@ -12,10 +13,27 @@ def test_config_file_io(tmp_path: Path):
cfg_path = tmp_path / "config"

with pytest.raises(FileNotFoundError):
read_config_file(cfg_path, DeviceConfig)
read_config_file(cfg_path, GpioDriverConfig)

config = DeviceConfig(device_id=uuid4())
config = GpioDriverConfig(
identifier="test-config-file-io",
driver_module="carlos.edge.device.driver.dht11",
direction="input",
pin=7,
)

write_config_file(cfg_path, config)

assert read_config_file(cfg_path, DeviceConfig) == config
assert read_config_file(cfg_path, GpioDriverConfig) == config


def test_load_io():
"""This test ensures that the IOs are loaded correctly."""

io = load_drivers(config_dir=TEST_DEVICE_WORKDIR)

assert len(io) == EXPECTED_IO_COUNT, "The number of IOs does not match."

assert all(
isinstance(io, (AnalogInput, DigitalOutput)) for io in io
), "Not all IOs are of the correct type."
Empty file.
159 changes: 159 additions & 0 deletions lib/py_edge_device/carlos/edge/device/driver/_dhtxx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
__all__ = ["DHTXX", "DhtConfig", "DHTType"]

from abc import ABC
from enum import StrEnum
from time import sleep
from typing import Literal

from carlos.edge.interface.device import AnalogInput, GpioDriverConfig
from pydantic import Field

from carlos.edge.device.protocol import GPIO


class DhtConfig(GpioDriverConfig):
"""Configuration for a DHT sensor."""

direction: Literal["input"] = Field("input")


class DHTType(StrEnum):
DHT11 = "DHT11"
DHT22 = "DHT22"


class DHT:
"""Code for Temperature & Humidity Sensor of Seeed Studio.
Code is originally from, but modified to my needs:
http://wiki.seeedstudio.com/Grove-TemperatureAndHumidity_Sensor/
"""

PULSES_CNT = 41

MAX_CNT = 320

def __init__(self, dht_type: DHTType, pin: int):
"""
:param dht_type: either DHTtype.DHT11 or DHTtype.22
:param pin: gpio pin where the sensor is connected to
"""

# store the pin and type
self._pin = pin
self._dht_type = dht_type

GPIO.setup(self._pin, GPIO.OUT)

def read(self) -> tuple[float, float]:
"""Internal read method.
http://www.ocfreaks.com/basics-interfacing-dht11-dht22-humidity-temperature-sensor-mcu/
:returns (humidity in %, temperature in °C)"""

# Send Falling signal to trigger sensor output data
# Wait for 20ms to collect 42 bytes data
GPIO.setup(self._pin, GPIO.OUT)
GPIO.output(self._pin, GPIO.HIGH)
sleep(0.2)
GPIO.output(self._pin, GPIO.LOW)
sleep(0.018)

GPIO.setup(self._pin, GPIO.IN)

# a short delay needed
for _ in range(10):
pass

# pullup by host 20-40 us
count = 0
while GPIO.input(self._pin):
count += 1
if count > self.MAX_CNT:
raise RuntimeError("pullup by host 20-40us failed")

pulse_cnt = [0] * (2 * self.PULSES_CNT)
for pulse in range(0, self.PULSES_CNT * 2, 2):
while not GPIO.input(self._pin):
pulse_cnt[pulse] += 1
if pulse_cnt[pulse] > self.MAX_CNT:
raise RuntimeError(f"pulldown by DHT timeout: {pulse}")

while GPIO.input(self._pin):
pulse_cnt[pulse + 1] += 1
if pulse_cnt[pulse + 1] > self.MAX_CNT:
if pulse == (self.PULSES_CNT - 1) * 2:
pass
raise RuntimeError(f"pullup by DHT timeout: {pulse}")

total_cnt = 0
for pulse in range(2, 2 * self.PULSES_CNT, 2):
total_cnt += pulse_cnt[pulse]

# Low level (50 us) average counter
average_cnt = total_cnt / (self.PULSES_CNT - 1)

data = ""
for pulse in range(3, 2 * self.PULSES_CNT, 2):
if pulse_cnt[pulse] > average_cnt:
data += "1"
else:
data += "0"

byte0 = int(data[0:8], 2)
byte1 = int(data[8:16], 2)
byte2 = int(data[16:24], 2)
byte3 = int(data[24:32], 2)
crc_byte = int(data[32:40], 2)

data_checksum = (byte0 + byte1 + byte2 + byte3) & 0xFF
if crc_byte != data_checksum:
raise RuntimeError("checksum error!")

if self._dht_type == DHTType.DHT11:
humidity = float(byte0)
temperature = float(byte2)
else:
humidity = float(int(data[0:16], 2) * 0.1)
temperature = float(int(data[17:32], 2) * 0.2 * (0.5 - int(data[16], 2)))

return temperature, humidity


class DHTXX(AnalogInput, ABC):
"""DHTXX Temperature and Humidity Sensor."""

def __init__(self, config: GpioDriverConfig):

super().__init__(config=config)

self._dht: DHT | None = None
self._dht_type: DHTType | None = None

def setup(self):
"""Sets up the DHT11 sensor."""

self._dht = DHT(dht_type=self._dht_type, pin=self.config.pin)

def read(self) -> dict[str, float]:
"""Reads the temperature and humidity."""

assert self._dht is not None, "The DHT sensor has not been initialized."

# Reading the DHT sensor is quite unreliable, as the device is not a real-time
# device. Thus, we just try it a couple of times and fail if it does not work.
last_error: Exception | None = None
for i in range(16):
try:
temperature, humidity = self._dht.read()
return {
"temperature": temperature,
"humidity": humidity,
}
except RuntimeError as ex:
last_error = ex

assert last_error is not None
raise last_error
37 changes: 37 additions & 0 deletions lib/py_edge_device/carlos/edge/device/driver/device_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import psutil
from carlos.edge.interface.device import AnalogInput, DriverConfig, DriverFactory


class DeviceMetrics(AnalogInput):
"""Provides the metrics of the device."""

def __init__(self, config: DriverConfig):

super().__init__(config=config)

def setup(self):
pass

def read(self) -> dict[str, float]:
"""Reads the device metrics."""

return {
"cpu.load_percent": psutil.cpu_percent(interval=1.0),
"cpu.temperature": self._read_cpu_temp(),
"memory.usage_percent": psutil.virtual_memory().percent,
"disk.usage_percent": psutil.disk_usage("/").percent,
}

@staticmethod
def _read_cpu_temp() -> float:
"""Reads the CPU temperature."""
try:
with open("/sys/class/thermal/thermal_zone0/temp") as f:
return float(f.read().strip()) / 1000
except FileNotFoundError:
return 0.0


DriverFactory().register(
driver_module=__name__, config=DriverConfig, factory=DeviceMetrics
)
16 changes: 16 additions & 0 deletions lib/py_edge_device/carlos/edge/device/driver/dht11.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from carlos.edge.interface.device import DriverFactory, GpioDriverConfig

from ._dhtxx import DHTXX, DhtConfig, DHTType


class DHT11(DHTXX):
"""DHT11 Temperature and Humidity Sensor."""

def __init__(self, config: GpioDriverConfig):

super().__init__(config=config)

self._dht_type = DHTType.DHT11


DriverFactory().register(driver_module=__name__, config=DhtConfig, factory=DHT11)
16 changes: 16 additions & 0 deletions lib/py_edge_device/carlos/edge/device/driver/dht22.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from carlos.edge.interface.device import DriverFactory, GpioDriverConfig

from ._dhtxx import DHTXX, DhtConfig, DHTType


class DHT22(DHTXX):
"""DHT22 Temperature and Humidity Sensor."""

def __init__(self, config: GpioDriverConfig):

super().__init__(config=config)

self._dht_type = DHTType.DHT22


DriverFactory().register(driver_module=__name__, config=DhtConfig, factory=DHT22)
Loading

0 comments on commit 038d421

Please sign in to comment.