diff --git a/odxtools/compumethods/compumethod.py b/odxtools/compumethods/compumethod.py index 9abbe060..5a43fa81 100644 --- a/odxtools/compumethods/compumethod.py +++ b/odxtools/compumethods/compumethod.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: MIT import abc from dataclasses import dataclass -from typing import List, Literal, Optional +from typing import Literal from ..odxtypes import AtomicOdxType, DataType @@ -35,6 +35,3 @@ def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool: def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool: raise NotImplementedError() - - def get_valid_physical_values(self) -> Optional[List[DataType]]: - return None diff --git a/odxtools/compumethods/createanycompumethod.py b/odxtools/compumethods/createanycompumethod.py index 51d4d275..e4e9ffa5 100644 --- a/odxtools/compumethods/createanycompumethod.py +++ b/odxtools/compumethods/createanycompumethod.py @@ -19,12 +19,12 @@ def _parse_compu_scale_to_linear_compu_method( *, - scale_element, + scale_element: ElementTree.Element, internal_type: DataType, physical_type: DataType, - is_scale_linear=False, - **kwargs, -): + is_scale_linear: bool = False, + **kwargs: Any, +) -> LinearCompuMethod: odxassert(physical_type in [ DataType.A_FLOAT32, DataType.A_FLOAT64, @@ -47,12 +47,12 @@ def _parse_compu_scale_to_linear_compu_method( kwargs["internal_type"] = internal_type kwargs["physical_type"] = physical_type - coeffs = scale_element.find("COMPU-RATIONAL-COEFFS") + coeffs = odxrequire(scale_element.find("COMPU-RATIONAL-COEFFS")) nums = coeffs.iterfind("COMPU-NUMERATOR/V") - offset = computation_python_type(next(nums).text) + offset = computation_python_type(odxrequire(next(nums).text)) factor_el = next(nums, None) - factor = computation_python_type(factor_el.text if factor_el is not None else "0") + factor = computation_python_type(odxrequire(factor_el.text) if factor_el is not None else "0") denominator = 1.0 if (string := coeffs.findtext("COMPU-DENOMINATOR/V")) is not None: denominator = float(string) diff --git a/odxtools/compumethods/identicalcompumethod.py b/odxtools/compumethods/identicalcompumethod.py index 46a57d56..08eb86b7 100644 --- a/odxtools/compumethods/identicalcompumethod.py +++ b/odxtools/compumethods/identicalcompumethod.py @@ -1,6 +1,7 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass +from ..odxtypes import AtomicOdxType from .compumethod import CompuMethod, CompuMethodCategory @@ -11,14 +12,14 @@ class IdenticalCompuMethod(CompuMethod): def category(self) -> CompuMethodCategory: return "IDENTICAL" - def convert_physical_to_internal(self, physical_value): + def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType: return physical_value - def convert_internal_to_physical(self, internal_value): + def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType: return internal_value - def is_valid_physical_value(self, physical_value): + def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool: return self.physical_type.isinstance(physical_value) - def is_valid_internal_value(self, internal_value): + def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool: return self.internal_type.isinstance(internal_value) diff --git a/odxtools/compumethods/linearcompumethod.py b/odxtools/compumethods/linearcompumethod.py index 4e00e1d1..e83518cc 100644 --- a/odxtools/compumethods/linearcompumethod.py +++ b/odxtools/compumethods/linearcompumethod.py @@ -1,9 +1,9 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import Union +from typing import cast -from ..exceptions import odxassert -from ..odxtypes import DataType +from ..exceptions import DecodeError, EncodeError, odxassert +from ..odxtypes import AtomicOdxType, DataType from .compumethod import CompuMethod, CompuMethodCategory from .limit import IntervalType, Limit @@ -73,20 +73,20 @@ def category(self) -> CompuMethodCategory: return "LINEAR" @property - def physical_lower_limit(self): + def physical_lower_limit(self) -> Limit: return self._physical_lower_limit @property - def physical_upper_limit(self): + def physical_upper_limit(self) -> Limit: return self._physical_upper_limit - def __compute_physical_limits(self): + def __compute_physical_limits(self) -> None: """Computes the physical limits and stores them in the properties self._physical_lower_limit and self._physical_upper_limit. This method is only called during the initialization of a LinearCompuMethod. """ - def convert_to_limit_to_physical(limit: Limit, is_upper_limit: bool): + def convert_to_limit_to_physical(limit: Limit, is_upper_limit: bool) -> Limit: """Helper method Parameters: @@ -127,10 +127,14 @@ def convert_to_limit_to_physical(limit: Limit, is_upper_limit: bool): if self.physical_type == DataType.A_UINT32: # If the data type is unsigned, the physical lower limit should be at least 0. if (self._physical_lower_limit.interval_type == IntervalType.INFINITE or - self._physical_lower_limit.value < 0): + cast(float, self._physical_lower_limit.value) < 0): self._physical_lower_limit = Limit(value=0, interval_type=IntervalType.CLOSED) - def _convert_internal_to_physical(self, internal_value): + def _convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType: + if not isinstance(internal_value, (int, float)): + raise DecodeError("The type of internal values of linear compumethods must " + "either int or float") + if self.denominator is None: result = self.offset + self.factor * internal_value else: @@ -143,11 +147,15 @@ def _convert_internal_to_physical(self, internal_value): result = round(result) return self.physical_type.make_from(result) - def convert_internal_to_physical(self, internal_value) -> Union[int, float]: + def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType: odxassert(self.is_valid_internal_value(internal_value)) return self._convert_internal_to_physical(internal_value) - def convert_physical_to_internal(self, physical_value): + def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType: + if not isinstance(physical_value, (int, float)): + raise EncodeError("The type of physical values of linear compumethods must " + "either int or float") + odxassert( self.is_valid_physical_value(physical_value), f"physical value {physical_value} of type {type(physical_value)} " @@ -165,12 +173,12 @@ def convert_physical_to_internal(self, physical_value): result = round(result) return self.internal_type.make_from(result) - def is_valid_physical_value(self, physical_value): + def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool: # Do type checks expected_type = self.physical_type.as_python_type() - if expected_type == float and type(physical_value) not in [int, float]: + if expected_type == float and not isinstance(physical_value, (int, float)): return False - elif expected_type != float and type(physical_value) != expected_type: + elif expected_type != float and not isinstance(physical_value, expected_type): return False # Compare to the limits @@ -180,11 +188,11 @@ def is_valid_physical_value(self, physical_value): return False return True - def is_valid_internal_value(self, internal_value): + def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool: expected_type = self.internal_type.as_python_type() - if expected_type == float and type(internal_value) not in [int, float]: + if expected_type == float and not isinstance(internal_value, (int, float)): return False - elif expected_type != float and type(internal_value) != expected_type: + elif expected_type != float and not isinstance(internal_value, expected_type): return False if not self.internal_lower_limit.complies_to_lower(internal_value): diff --git a/odxtools/compumethods/scalelinearcompumethod.py b/odxtools/compumethods/scalelinearcompumethod.py index dac60dc3..ba726e70 100644 --- a/odxtools/compumethods/scalelinearcompumethod.py +++ b/odxtools/compumethods/scalelinearcompumethod.py @@ -3,6 +3,7 @@ from typing import List from ..exceptions import odxassert +from ..odxtypes import AtomicOdxType from .compumethod import CompuMethod, CompuMethodCategory from .linearcompumethod import LinearCompuMethod @@ -15,24 +16,24 @@ class ScaleLinearCompuMethod(CompuMethod): def category(self) -> CompuMethodCategory: return "SCALE-LINEAR" - def convert_physical_to_internal(self, physical_value): + def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType: odxassert( self.is_valid_physical_value(physical_value), - f"cannot convert the invalid physical value {physical_value} " + f"cannot convert the invalid physical value {physical_value!r} " f"of type {type(physical_value)}") lin_method = next( scale for scale in self.linear_methods if scale.is_valid_physical_value(physical_value)) return lin_method.convert_physical_to_internal(physical_value) - def convert_internal_to_physical(self, internal_value): + def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType: lin_method = next( scale for scale in self.linear_methods if scale.is_valid_internal_value(internal_value)) return lin_method.convert_internal_to_physical(internal_value) - def is_valid_physical_value(self, physical_value): + def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool: return any( True for scale in self.linear_methods if scale.is_valid_physical_value(physical_value)) - def is_valid_internal_value(self, internal_value): + def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool: return any( True for scale in self.linear_methods if scale.is_valid_internal_value(internal_value)) diff --git a/odxtools/compumethods/texttablecompumethod.py b/odxtools/compumethods/texttablecompumethod.py index 60025cf1..d22ea1fb 100644 --- a/odxtools/compumethods/texttablecompumethod.py +++ b/odxtools/compumethods/texttablecompumethod.py @@ -3,7 +3,7 @@ from typing import List, Optional from ..exceptions import DecodeError, EncodeError, odxassert -from ..odxtypes import DataType +from ..odxtypes import AtomicOdxType, DataType from .compumethod import CompuMethod, CompuMethodCategory from .compuscale import CompuScale @@ -28,25 +28,30 @@ def __post_init__(self) -> None: def category(self) -> CompuMethodCategory: return "TEXTTABLE" - def _get_scales(self): + def _get_scales(self) -> List[CompuScale]: scales = list(self.internal_to_phys) if self.compu_default_value: # Default is last, since it's a fallback scales.append(self.compu_default_value) return scales - def convert_physical_to_internal(self, physical_value): - scale: CompuScale = next( - filter(lambda scale: scale.compu_const == physical_value, self._get_scales()), None) - if scale is not None: - res = ( - scale.compu_inverse_value - if scale.compu_inverse_value is not None else scale.lower_limit.value) + def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType: + scales = [x for x in self._get_scales() if x.compu_const == physical_value] + if scales: + inverse_value: Optional[AtomicOdxType] = scales[0].compu_inverse_value + if inverse_value is not None: + res = inverse_value + elif scales[0].lower_limit is not None: + res = scales[0].lower_limit.value + else: + res = None + odxassert(self.internal_type.isinstance(res)) - return res - raise EncodeError(f"Texttable compu method could not encode '{physical_value}'.") + return res # type: ignore[return-value] + + raise EncodeError(f"Texttable compu method could not encode '{physical_value!r}'.") - def __is_internal_in_scale(self, internal_value, scale: CompuScale): + def __is_internal_in_scale(self, internal_value: AtomicOdxType, scale: CompuScale) -> bool: if scale == self.compu_default_value: return True if scale.lower_limit is not None and not scale.lower_limit.complies_to_lower( @@ -60,23 +65,23 @@ def __is_internal_in_scale(self, internal_value, scale: CompuScale): # value complies to the defined limits return True - def convert_internal_to_physical(self, internal_value): + def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType: scale = next( filter( lambda scale: self.__is_internal_in_scale(internal_value, scale), self._get_scales(), ), None) - if scale is None: + if scale is None or scale.compu_const is None: raise DecodeError( - f"Texttable compu method could not decode {internal_value} to string.") + f"Texttable compu method could not decode {internal_value!r} to string.") return scale.compu_const - def is_valid_physical_value(self, physical_value): + def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool: return physical_value in self.get_valid_physical_values() - def is_valid_internal_value(self, internal_value): + def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool: return any( self.__is_internal_in_scale(internal_value, scale) for scale in self._get_scales()) - def get_valid_physical_values(self): + def get_valid_physical_values(self) -> List[Optional[AtomicOdxType]]: return [x.compu_const for x in self._get_scales()] diff --git a/odxtools/dataobjectproperty.py b/odxtools/dataobjectproperty.py index 59665605..0ac9b1b9 100644 --- a/odxtools/dataobjectproperty.py +++ b/odxtools/dataobjectproperty.py @@ -145,10 +145,7 @@ def convert_physical_to_bytes(self, physical_value: Any, encode_state: EncodeSta """ if not self.is_valid_physical_value(physical_value): raise EncodeError(f"The value {repr(physical_value)} of type {type(physical_value)}" - f" is not a valid." + - (f" Valid values are {self.compu_method.get_valid_physical_values()}" - if self.compu_method.get_valid_physical_values( - ) else f" Expected type {self.physical_type.base_data_type.value}.")) + f" is not a valid.") internal_val = self.convert_physical_to_internal(physical_value) return self.diag_coded_type.convert_internal_to_bytes(