diff --git a/docs/changelog.md b/docs/changelog.md index b3e6188d1..335c68ee7 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -6,6 +6,9 @@ nav_order: 2 # Changelog +### Devices + +- Weather: Support either DPT 9.006 (2byte) or DPT 14.058 (4byte) for `group_address_air_pressure` ### DPT diff --git a/test/devices_tests/weather_test.py b/test/devices_tests/weather_test.py index 883f3f0ec..fa6937ad7 100644 --- a/test/devices_tests/weather_test.py +++ b/test/devices_tests/weather_test.py @@ -2,6 +2,8 @@ import datetime +import pytest + from xknx import XKNX from xknx.devices import Weather from xknx.devices.weather import WeatherCondition @@ -87,35 +89,42 @@ async def test_brightness(self): assert weather._brightness_north.unit_of_measurement == "lx" assert weather._brightness_north.ha_device_class == "illuminance" - async def test_pressure(self): - """Test resolve state with pressure.""" + @pytest.mark.parametrize( + ("value", "payload"), + [ + (98631.68, DPTArray((0x6C, 0xB4))), # 2byte float + (98631.68, DPTArray((0x47, 0xC0, 0xA3, 0xD7))), # 4byte float + ], + ) + async def test_pressure(self, value, payload): + """Test air pressure telegram.""" xknx = XKNX() weather = Weather(name="weather", xknx=xknx, group_address_air_pressure="1/3/4") weather.process( Telegram( destination_address=GroupAddress("1/3/4"), - payload=GroupValueWrite(value=DPTArray((0x6C, 0xAD))), + payload=GroupValueWrite(value=payload), ) ) - assert weather.air_pressure == 98058.24 + assert weather.air_pressure == value assert weather._air_pressure.unit_of_measurement == "Pa" assert weather._air_pressure.ha_device_class == "pressure" async def test_humidity(self): - """Test humidity.""" + """Test humidity telegram.""" xknx = XKNX() weather = Weather(name="weather", xknx=xknx, group_address_humidity="1/2/4") weather.process( Telegram( destination_address=GroupAddress("1/2/4"), - payload=GroupValueWrite(value=DPTArray((0x7E, 0xE1))), + payload=GroupValueWrite(value=DPTArray((0x15, 0x73))), ) ) - assert weather.humidity == 577044.48 + assert weather.humidity == 55.8 assert weather._humidity.unit_of_measurement == "%" assert weather._humidity.ha_device_class == "humidity" diff --git a/xknx/devices/weather.py b/xknx/devices/weather.py index b7e625f99..773b55b0d 100644 --- a/xknx/devices/weather.py +++ b/xknx/devices/weather.py @@ -20,9 +20,11 @@ from enum import Enum from typing import TYPE_CHECKING, Any +from xknx.dpt import DPTPressure, DPTPressure2Byte from xknx.remote_value import ( GroupAddressesType, RemoteValue, + RemoteValueByLength, RemoteValueNumeric, RemoteValueSwitch, ) @@ -211,11 +213,11 @@ def __init__( after_update_cb=self.after_update, ) - self._air_pressure = RemoteValueNumeric( + self._air_pressure = RemoteValueByLength( xknx, + dpt_classes=(DPTPressure, DPTPressure2Byte), group_address_state=group_address_air_pressure, sync_state=sync_state, - value_type="pressure_2byte", device_name=self.name, feature_name="Air pressure", after_update_cb=self.after_update, diff --git a/xknx/remote_value/__init__.py b/xknx/remote_value/__init__.py index 009211f2a..486df06a7 100644 --- a/xknx/remote_value/__init__.py +++ b/xknx/remote_value/__init__.py @@ -1,6 +1,7 @@ """Module for handling values on the KNX bus.""" from .remote_value import GroupAddressesType, RemoteValue +from .remote_value_by_length import RemoteValueByLength from .remote_value_climate_mode import ( RemoteValueBinaryHeatCool, RemoteValueBinaryOperationMode, @@ -31,6 +32,7 @@ "RemoteValue", "RemoteValueBinaryHeatCool", "RemoteValueBinaryOperationMode", + "RemoteValueByLength", "RemoteValueColorRGB", "RemoteValueColorRGBW", "RemoteValueColorXYY", diff --git a/xknx/remote_value/remote_value_by_length.py b/xknx/remote_value/remote_value_by_length.py new file mode 100644 index 000000000..2a5f252b8 --- /dev/null +++ b/xknx/remote_value/remote_value_by_length.py @@ -0,0 +1,102 @@ +"""Module for managing remote value with payload length based DPT detection.""" + +from __future__ import annotations + +from collections.abc import Iterable +from typing import TYPE_CHECKING + +from xknx.dpt import DPTArray, DPTBinary, DPTNumeric +from xknx.exceptions import ConversionError, CouldNotParseTelegram + +from .remote_value import GroupAddressesType, RemoteValue, RVCallbackType + +if TYPE_CHECKING: + from xknx.xknx import XKNX + + +class RemoteValueByLength(RemoteValue[float]): + """RemoteValue with DPT detection based on payload length of first received value.""" + + def __init__( + self, + xknx: XKNX, + dpt_classes: Iterable[type[DPTNumeric]], + group_address: GroupAddressesType = None, + group_address_state: GroupAddressesType = None, + sync_state: bool | int | float | str = True, + device_name: str | None = None, + feature_name: str | None = None, + after_update_cb: RVCallbackType[float] | None = None, + ): + """Initialize RemoteValueByLength class.""" + _payload_lengths = set() + for dpt_class in dpt_classes: + if ( + not issubclass(dpt_class, DPTNumeric) + or dpt_class.payload_type is not DPTArray + ): + raise ConversionError( + "Only DPTNumeric subclasses with payload_type DPTArray are supported" + ) + if dpt_class.payload_length in _payload_lengths: + raise ConversionError( + f"Duplicate payload_length {dpt_class.payload_length} in {dpt_classes}" + ) + _payload_lengths.add(dpt_class.payload_length) + + super().__init__( + xknx, + group_address, + group_address_state, + sync_state=sync_state, + device_name=device_name, + feature_name=feature_name, + after_update_cb=after_update_cb, + ) + + self._dpt_classes = dpt_classes + self._internal_dpt_class: type[DPTNumeric] | None = None + + def to_knx(self, value: float) -> DPTArray: + """Convert value to payload.""" + if self._internal_dpt_class is None: + raise ConversionError( + f"RemoteValue DPT not initialized for {self.device_name}" + ) + return self._internal_dpt_class.to_knx(value) + + def from_knx(self, payload: DPTArray | DPTBinary) -> float: + """Convert current payload to value.""" + if self._internal_dpt_class is None: + self._internal_dpt_class = self._determine_dpt_class(payload) + + return self._internal_dpt_class.from_knx(payload) + + @property + def unit_of_measurement(self) -> str | None: + """Return the unit of measurement.""" + if not self._internal_dpt_class: + return None + return self._internal_dpt_class.unit + + @property + def ha_device_class(self) -> str | None: + """Return a string representing the home assistant device class.""" + if not self._internal_dpt_class: + return None + return getattr(self._internal_dpt_class, "ha_device_class", None) + + def _determine_dpt_class(self, payload: DPTArray | DPTBinary) -> type[DPTNumeric]: + """Test if telegram payload may be parsed.""" + if isinstance(payload, DPTArray): + try: + return next( + dpt_class + for dpt_class in self._dpt_classes + if dpt_class.payload_type is DPTArray + and dpt_class.payload_length == len(payload.value) + ) + except StopIteration: + pass + + raise CouldNotParseTelegram("Payload invalid", payload=str(payload))