Skip to content

Commit

Permalink
compu methods: add/fix type annotations
Browse files Browse the repository at this point in the history
Signed-off-by: Andreas Lauser <andreas.lauser@mbition.io>
Signed-off-by: Christian Hackenbeck <christian.hackenbeck@mbition.io>
Signed-off-by: Alexander Walz <alexander.walz@mbition.io>
  • Loading branch information
andlaus committed Sep 27, 2023
1 parent 3bc3f60 commit 1b4b4b0
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 59 deletions.
5 changes: 1 addition & 4 deletions odxtools/compumethods/compumethod.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: MIT
import abc
from dataclasses import dataclass
from typing import List, Literal, Optional
from typing import Literal

from ..odxtypes import AtomicOdxType, DataType

Expand Down Expand Up @@ -35,6 +35,3 @@ def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool:

def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool:
raise NotImplementedError()

def get_valid_physical_values(self) -> Optional[List[DataType]]:
return None
14 changes: 7 additions & 7 deletions odxtools/compumethods/createanycompumethod.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

def _parse_compu_scale_to_linear_compu_method(
*,
scale_element,
scale_element: ElementTree.Element,
internal_type: DataType,
physical_type: DataType,
is_scale_linear=False,
**kwargs,
):
is_scale_linear: bool = False,
**kwargs: Any,
) -> LinearCompuMethod:
odxassert(physical_type in [
DataType.A_FLOAT32,
DataType.A_FLOAT64,
Expand All @@ -47,12 +47,12 @@ def _parse_compu_scale_to_linear_compu_method(
kwargs["internal_type"] = internal_type
kwargs["physical_type"] = physical_type

coeffs = scale_element.find("COMPU-RATIONAL-COEFFS")
coeffs = odxrequire(scale_element.find("COMPU-RATIONAL-COEFFS"))
nums = coeffs.iterfind("COMPU-NUMERATOR/V")

offset = computation_python_type(next(nums).text)
offset = computation_python_type(odxrequire(next(nums).text))
factor_el = next(nums, None)
factor = computation_python_type(factor_el.text if factor_el is not None else "0")
factor = computation_python_type(odxrequire(factor_el.text) if factor_el is not None else "0")
denominator = 1.0
if (string := coeffs.findtext("COMPU-DENOMINATOR/V")) is not None:
denominator = float(string)
Expand Down
9 changes: 5 additions & 4 deletions odxtools/compumethods/identicalcompumethod.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass

from ..odxtypes import AtomicOdxType
from .compumethod import CompuMethod, CompuMethodCategory


Expand All @@ -11,14 +12,14 @@ class IdenticalCompuMethod(CompuMethod):
def category(self) -> CompuMethodCategory:
return "IDENTICAL"

def convert_physical_to_internal(self, physical_value):
def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType:
return physical_value

def convert_internal_to_physical(self, internal_value):
def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType:
return internal_value

def is_valid_physical_value(self, physical_value):
def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool:
return self.physical_type.isinstance(physical_value)

def is_valid_internal_value(self, internal_value):
def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool:
return self.internal_type.isinstance(internal_value)
42 changes: 25 additions & 17 deletions odxtools/compumethods/linearcompumethod.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import Union
from typing import cast

from ..exceptions import odxassert
from ..odxtypes import DataType
from ..exceptions import DecodeError, EncodeError, odxassert
from ..odxtypes import AtomicOdxType, DataType
from .compumethod import CompuMethod, CompuMethodCategory
from .limit import IntervalType, Limit

Expand Down Expand Up @@ -73,20 +73,20 @@ def category(self) -> CompuMethodCategory:
return "LINEAR"

@property
def physical_lower_limit(self):
def physical_lower_limit(self) -> Limit:
return self._physical_lower_limit

@property
def physical_upper_limit(self):
def physical_upper_limit(self) -> Limit:
return self._physical_upper_limit

def __compute_physical_limits(self):
def __compute_physical_limits(self) -> None:
"""Computes the physical limits and stores them in the properties
self._physical_lower_limit and self._physical_upper_limit.
This method is only called during the initialization of a LinearCompuMethod.
"""

def convert_to_limit_to_physical(limit: Limit, is_upper_limit: bool):
def convert_to_limit_to_physical(limit: Limit, is_upper_limit: bool) -> Limit:
"""Helper method
Parameters:
Expand Down Expand Up @@ -127,10 +127,14 @@ def convert_to_limit_to_physical(limit: Limit, is_upper_limit: bool):
if self.physical_type == DataType.A_UINT32:
# If the data type is unsigned, the physical lower limit should be at least 0.
if (self._physical_lower_limit.interval_type == IntervalType.INFINITE or
self._physical_lower_limit.value < 0):
cast(float, self._physical_lower_limit.value) < 0):
self._physical_lower_limit = Limit(value=0, interval_type=IntervalType.CLOSED)

def _convert_internal_to_physical(self, internal_value):
def _convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType:
if not isinstance(internal_value, (int, float)):
raise DecodeError("The type of internal values of linear compumethods must "
"either int or float")

if self.denominator is None:
result = self.offset + self.factor * internal_value
else:
Expand All @@ -143,11 +147,15 @@ def _convert_internal_to_physical(self, internal_value):
result = round(result)
return self.physical_type.make_from(result)

def convert_internal_to_physical(self, internal_value) -> Union[int, float]:
def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType:
odxassert(self.is_valid_internal_value(internal_value))
return self._convert_internal_to_physical(internal_value)

def convert_physical_to_internal(self, physical_value):
def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType:
if not isinstance(physical_value, (int, float)):
raise EncodeError("The type of physical values of linear compumethods must "
"either int or float")

odxassert(
self.is_valid_physical_value(physical_value),
f"physical value {physical_value} of type {type(physical_value)} "
Expand All @@ -165,12 +173,12 @@ def convert_physical_to_internal(self, physical_value):
result = round(result)
return self.internal_type.make_from(result)

def is_valid_physical_value(self, physical_value):
def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool:
# Do type checks
expected_type = self.physical_type.as_python_type()
if expected_type == float and type(physical_value) not in [int, float]:
if expected_type == float and not isinstance(physical_value, (int, float)):
return False
elif expected_type != float and type(physical_value) != expected_type:
elif expected_type != float and not isinstance(physical_value, expected_type):
return False

# Compare to the limits
Expand All @@ -180,11 +188,11 @@ def is_valid_physical_value(self, physical_value):
return False
return True

def is_valid_internal_value(self, internal_value):
def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool:
expected_type = self.internal_type.as_python_type()
if expected_type == float and type(internal_value) not in [int, float]:
if expected_type == float and not isinstance(internal_value, (int, float)):
return False
elif expected_type != float and type(internal_value) != expected_type:
elif expected_type != float and not isinstance(internal_value, expected_type):
return False

if not self.internal_lower_limit.complies_to_lower(internal_value):
Expand Down
11 changes: 6 additions & 5 deletions odxtools/compumethods/scalelinearcompumethod.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import List

from ..exceptions import odxassert
from ..odxtypes import AtomicOdxType
from .compumethod import CompuMethod, CompuMethodCategory
from .linearcompumethod import LinearCompuMethod

Expand All @@ -15,24 +16,24 @@ class ScaleLinearCompuMethod(CompuMethod):
def category(self) -> CompuMethodCategory:
return "SCALE-LINEAR"

def convert_physical_to_internal(self, physical_value):
def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType:
odxassert(
self.is_valid_physical_value(physical_value),
f"cannot convert the invalid physical value {physical_value} "
f"cannot convert the invalid physical value {physical_value!r} "
f"of type {type(physical_value)}")
lin_method = next(
scale for scale in self.linear_methods if scale.is_valid_physical_value(physical_value))
return lin_method.convert_physical_to_internal(physical_value)

def convert_internal_to_physical(self, internal_value):
def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType:
lin_method = next(
scale for scale in self.linear_methods if scale.is_valid_internal_value(internal_value))
return lin_method.convert_internal_to_physical(internal_value)

def is_valid_physical_value(self, physical_value):
def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool:
return any(
True for scale in self.linear_methods if scale.is_valid_physical_value(physical_value))

def is_valid_internal_value(self, internal_value):
def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool:
return any(
True for scale in self.linear_methods if scale.is_valid_internal_value(internal_value))
41 changes: 23 additions & 18 deletions odxtools/compumethods/texttablecompumethod.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import List, Optional

from ..exceptions import DecodeError, EncodeError, odxassert
from ..odxtypes import DataType
from ..odxtypes import AtomicOdxType, DataType
from .compumethod import CompuMethod, CompuMethodCategory
from .compuscale import CompuScale

Expand All @@ -28,25 +28,30 @@ def __post_init__(self) -> None:
def category(self) -> CompuMethodCategory:
return "TEXTTABLE"

def _get_scales(self):
def _get_scales(self) -> List[CompuScale]:
scales = list(self.internal_to_phys)
if self.compu_default_value:
# Default is last, since it's a fallback
scales.append(self.compu_default_value)
return scales

def convert_physical_to_internal(self, physical_value):
scale: CompuScale = next(
filter(lambda scale: scale.compu_const == physical_value, self._get_scales()), None)
if scale is not None:
res = (
scale.compu_inverse_value
if scale.compu_inverse_value is not None else scale.lower_limit.value)
def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType:
scales = [x for x in self._get_scales() if x.compu_const == physical_value]
if scales:
inverse_value: Optional[AtomicOdxType] = scales[0].compu_inverse_value
if inverse_value is not None:
res = inverse_value
elif scales[0].lower_limit is not None:
res = scales[0].lower_limit.value
else:
res = None

odxassert(self.internal_type.isinstance(res))
return res
raise EncodeError(f"Texttable compu method could not encode '{physical_value}'.")
return res # type: ignore[return-value]

raise EncodeError(f"Texttable compu method could not encode '{physical_value!r}'.")

def __is_internal_in_scale(self, internal_value, scale: CompuScale):
def __is_internal_in_scale(self, internal_value: AtomicOdxType, scale: CompuScale) -> bool:
if scale == self.compu_default_value:
return True
if scale.lower_limit is not None and not scale.lower_limit.complies_to_lower(
Expand All @@ -60,23 +65,23 @@ def __is_internal_in_scale(self, internal_value, scale: CompuScale):
# value complies to the defined limits
return True

def convert_internal_to_physical(self, internal_value):
def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType:
scale = next(
filter(
lambda scale: self.__is_internal_in_scale(internal_value, scale),
self._get_scales(),
), None)
if scale is None:
if scale is None or scale.compu_const is None:
raise DecodeError(
f"Texttable compu method could not decode {internal_value} to string.")
f"Texttable compu method could not decode {internal_value!r} to string.")
return scale.compu_const

def is_valid_physical_value(self, physical_value):
def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool:
return physical_value in self.get_valid_physical_values()

def is_valid_internal_value(self, internal_value):
def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool:
return any(
self.__is_internal_in_scale(internal_value, scale) for scale in self._get_scales())

def get_valid_physical_values(self):
def get_valid_physical_values(self) -> List[Optional[AtomicOdxType]]:
return [x.compu_const for x in self._get_scales()]
5 changes: 1 addition & 4 deletions odxtools/dataobjectproperty.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,7 @@ def convert_physical_to_bytes(self, physical_value: Any, encode_state: EncodeSta
"""
if not self.is_valid_physical_value(physical_value):
raise EncodeError(f"The value {repr(physical_value)} of type {type(physical_value)}"
f" is not a valid." +
(f" Valid values are {self.compu_method.get_valid_physical_values()}"
if self.compu_method.get_valid_physical_values(
) else f" Expected type {self.physical_type.base_data_type.value}."))
f" is not a valid.")

internal_val = self.convert_physical_to_internal(physical_value)
return self.diag_coded_type.convert_internal_to_bytes(
Expand Down

0 comments on commit 1b4b4b0

Please sign in to comment.