Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add type annotations, part 2 #212

Merged
merged 13 commits into from
Sep 28, 2023
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
<!-- SPDX-License-Identifier: MIT -->
[![PyPi - Version](https://img.shields.io/pypi/v/odxtools)](https://pypi.org/project/odxtools)
[![PyPI - License](https://img.shields.io/pypi/l/odxtools)](LICENSE)
[![CI Status](https://github.com/mercedes-benz/odxtools/actions/workflows/tests.yml/badge.svg?branch=main)](https://github.com/mercedes-benz/odxtools/actions?query=branch%3Amain)

# odxtools

`odxtools` is a set of utilities for working with diagnostic
Expand Down Expand Up @@ -39,6 +43,7 @@ send to/received from ECUs in an pythonic manner.
- [Installation](#installation)
- [Usage Examples](#usage-examples)
- [Python snippets](#python-snippets)
- [Using the non-strict mode](#using-the-non-strict-mode)
- [Interactive Usage](#interactive-usage)
- [Python REPL](#python-repl)
- [Command line usage](#command-line-usage)
Expand Down Expand Up @@ -609,10 +614,6 @@ project, please read the [contributing guide](https://github.com/mercedes-benz/o
Please read our [Code of Conduct](https://github.com/mercedes-benz/daimler-foss/blob/master/CODE_OF_CONDUCT.md)
as it is our base for interaction.

## License
kayoub5 marked this conversation as resolved.
Show resolved Hide resolved

This project is licensed under the [MIT LICENSE](https://github.com/mercedes-benz/odxtools/blob/main/LICENSE).
kayoub5 marked this conversation as resolved.
Show resolved Hide resolved

## Provider Information

Please visit <https://mbition.io/en/home/index.html> for information on the provider.
Expand Down
15 changes: 6 additions & 9 deletions odxtools/compumethods/compumethod.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# SPDX-License-Identifier: MIT
import abc
from dataclasses import dataclass
from typing import Literal, Union
from typing import Literal

from ..odxtypes import DataType
from ..odxtypes import AtomicOdxType, DataType

CompuMethodCategory = Literal[
"IDENTICAL",
Expand All @@ -24,17 +24,14 @@ class CompuMethod(abc.ABC):
def category(self) -> CompuMethodCategory:
pass

def convert_physical_to_internal(self, physical_value):
def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType:
raise NotImplementedError()

def convert_internal_to_physical(self, internal_value) -> Union[int, float, str]:
def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType:
raise NotImplementedError()

def is_valid_physical_value(self, physical_value):
def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool:
raise NotImplementedError()

def is_valid_internal_value(self, internal_value):
def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool:
raise NotImplementedError()

def get_valid_physical_values(self):
return None
14 changes: 7 additions & 7 deletions odxtools/compumethods/createanycompumethod.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

def _parse_compu_scale_to_linear_compu_method(
*,
scale_element,
scale_element: ElementTree.Element,
internal_type: DataType,
physical_type: DataType,
is_scale_linear=False,
**kwargs,
):
is_scale_linear: bool = False,
**kwargs: Any,
) -> LinearCompuMethod:
odxassert(physical_type in [
DataType.A_FLOAT32,
DataType.A_FLOAT64,
Expand All @@ -47,12 +47,12 @@ def _parse_compu_scale_to_linear_compu_method(
kwargs["internal_type"] = internal_type
kwargs["physical_type"] = physical_type

coeffs = scale_element.find("COMPU-RATIONAL-COEFFS")
coeffs = odxrequire(scale_element.find("COMPU-RATIONAL-COEFFS"))
nums = coeffs.iterfind("COMPU-NUMERATOR/V")

offset = computation_python_type(next(nums).text)
offset = computation_python_type(odxrequire(next(nums).text))
factor_el = next(nums, None)
factor = computation_python_type(factor_el.text if factor_el is not None else "0")
factor = computation_python_type(odxrequire(factor_el.text) if factor_el is not None else "0")
denominator = 1.0
if (string := coeffs.findtext("COMPU-DENOMINATOR/V")) is not None:
denominator = float(string)
Expand Down
9 changes: 5 additions & 4 deletions odxtools/compumethods/identicalcompumethod.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass

from ..odxtypes import AtomicOdxType
from .compumethod import CompuMethod, CompuMethodCategory


Expand All @@ -11,14 +12,14 @@ class IdenticalCompuMethod(CompuMethod):
def category(self) -> CompuMethodCategory:
return "IDENTICAL"

def convert_physical_to_internal(self, physical_value):
def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType:
return physical_value

def convert_internal_to_physical(self, internal_value):
def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType:
return internal_value

def is_valid_physical_value(self, physical_value):
def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool:
return self.physical_type.isinstance(physical_value)

def is_valid_internal_value(self, internal_value):
def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool:
return self.internal_type.isinstance(internal_value)
18 changes: 9 additions & 9 deletions odxtools/compumethods/limit.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Union
from typing import Optional
from xml.etree import ElementTree

from ..exceptions import odxassert, odxraise, odxrequire
from ..odxtypes import DataType
from ..odxtypes import AtomicOdxType, DataType


class IntervalType(Enum):
Expand All @@ -16,7 +16,7 @@ class IntervalType(Enum):

@dataclass
class Limit:
value: Union[str, int, float, bytes]
value: AtomicOdxType
interval_type: IntervalType = IntervalType.CLOSED

def __post_init__(self) -> None:
Expand Down Expand Up @@ -53,30 +53,30 @@ def from_et(et_element: Optional[ElementTree.Element], *,
else:
return Limit(internal_type.from_string(odxrequire(et_element.text)), interval_type)

def complies_to_upper(self, value):
def complies_to_upper(self, value: AtomicOdxType) -> bool:
"""Checks if the value is in the range w.r.t. the upper limit.

* If the interval type is closed, return `value <= limit.value`.
* If the interval type is open, return `value < limit.value`.
* If the interval type is infinite, return `True`.
"""
if self.interval_type == IntervalType.CLOSED:
return value <= self.value
return value <= self.value # type: ignore[operator]
elif self.interval_type == IntervalType.OPEN:
return value < self.value
return value < self.value # type: ignore[operator]
elif self.interval_type == IntervalType.INFINITE:
return True

def complies_to_lower(self, value):
def complies_to_lower(self, value: AtomicOdxType) -> bool:
"""Checks if the value is in the range w.r.t. the lower limit.

* If the interval type is closed, return `limit.value <= value`.
* If the interval type is open, return `limit.value < value`.
* If the interval type is infinite, return `True`.
"""
if self.interval_type == IntervalType.CLOSED:
return self.value <= value
return self.value <= value # type: ignore[operator]
elif self.interval_type == IntervalType.OPEN:
return self.value < value
return self.value < value # type: ignore[operator]
elif self.interval_type == IntervalType.INFINITE:
return True
42 changes: 25 additions & 17 deletions odxtools/compumethods/linearcompumethod.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import Union
from typing import cast

from ..exceptions import odxassert
from ..odxtypes import DataType
from ..exceptions import DecodeError, EncodeError, odxassert
from ..odxtypes import AtomicOdxType, DataType
from .compumethod import CompuMethod, CompuMethodCategory
from .limit import IntervalType, Limit

Expand Down Expand Up @@ -73,20 +73,20 @@ def category(self) -> CompuMethodCategory:
return "LINEAR"

@property
def physical_lower_limit(self):
def physical_lower_limit(self) -> Limit:
return self._physical_lower_limit

@property
def physical_upper_limit(self):
def physical_upper_limit(self) -> Limit:
return self._physical_upper_limit

def __compute_physical_limits(self):
def __compute_physical_limits(self) -> None:
"""Computes the physical limits and stores them in the properties
self._physical_lower_limit and self._physical_upper_limit.
This method is only called during the initialization of a LinearCompuMethod.
"""

def convert_to_limit_to_physical(limit: Limit, is_upper_limit: bool):
def convert_to_limit_to_physical(limit: Limit, is_upper_limit: bool) -> Limit:
"""Helper method

Parameters:
Expand Down Expand Up @@ -127,10 +127,14 @@ def convert_to_limit_to_physical(limit: Limit, is_upper_limit: bool):
if self.physical_type == DataType.A_UINT32:
# If the data type is unsigned, the physical lower limit should be at least 0.
if (self._physical_lower_limit.interval_type == IntervalType.INFINITE or
self._physical_lower_limit.value < 0):
cast(float, self._physical_lower_limit.value) < 0):
self._physical_lower_limit = Limit(value=0, interval_type=IntervalType.CLOSED)

def _convert_internal_to_physical(self, internal_value):
def _convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType:
if not isinstance(internal_value, (int, float)):
raise DecodeError("The type of internal values of linear compumethods must "
"either int or float")

if self.denominator is None:
result = self.offset + self.factor * internal_value
else:
Expand All @@ -143,11 +147,15 @@ def _convert_internal_to_physical(self, internal_value):
result = round(result)
return self.physical_type.make_from(result)

def convert_internal_to_physical(self, internal_value) -> Union[int, float]:
def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType:
odxassert(self.is_valid_internal_value(internal_value))
return self._convert_internal_to_physical(internal_value)

def convert_physical_to_internal(self, physical_value):
def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType:
if not isinstance(physical_value, (int, float)):
raise EncodeError("The type of physical values of linear compumethods must "
"either int or float")

odxassert(
self.is_valid_physical_value(physical_value),
f"physical value {physical_value} of type {type(physical_value)} "
Expand All @@ -165,12 +173,12 @@ def convert_physical_to_internal(self, physical_value):
result = round(result)
return self.internal_type.make_from(result)

def is_valid_physical_value(self, physical_value):
def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool:
# Do type checks
expected_type = self.physical_type.as_python_type()
if expected_type == float and type(physical_value) not in [int, float]:
if expected_type == float and not isinstance(physical_value, (int, float)):
return False
elif expected_type != float and type(physical_value) != expected_type:
elif expected_type != float and not isinstance(physical_value, expected_type):
return False

# Compare to the limits
Expand All @@ -180,11 +188,11 @@ def is_valid_physical_value(self, physical_value):
return False
return True

def is_valid_internal_value(self, internal_value):
def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool:
expected_type = self.internal_type.as_python_type()
if expected_type == float and type(internal_value) not in [int, float]:
if expected_type == float and not isinstance(internal_value, (int, float)):
return False
elif expected_type != float and type(internal_value) != expected_type:
elif expected_type != float and not isinstance(internal_value, expected_type):
return False

if not self.internal_lower_limit.complies_to_lower(internal_value):
Expand Down
11 changes: 6 additions & 5 deletions odxtools/compumethods/scalelinearcompumethod.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import List

from ..exceptions import odxassert
from ..odxtypes import AtomicOdxType
from .compumethod import CompuMethod, CompuMethodCategory
from .linearcompumethod import LinearCompuMethod

Expand All @@ -15,24 +16,24 @@ class ScaleLinearCompuMethod(CompuMethod):
def category(self) -> CompuMethodCategory:
return "SCALE-LINEAR"

def convert_physical_to_internal(self, physical_value):
def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType:
odxassert(
self.is_valid_physical_value(physical_value),
f"cannot convert the invalid physical value {physical_value} "
f"cannot convert the invalid physical value {physical_value!r} "
f"of type {type(physical_value)}")
lin_method = next(
scale for scale in self.linear_methods if scale.is_valid_physical_value(physical_value))
return lin_method.convert_physical_to_internal(physical_value)

def convert_internal_to_physical(self, internal_value):
def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType:
lin_method = next(
scale for scale in self.linear_methods if scale.is_valid_internal_value(internal_value))
return lin_method.convert_internal_to_physical(internal_value)

def is_valid_physical_value(self, physical_value):
def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool:
return any(
True for scale in self.linear_methods if scale.is_valid_physical_value(physical_value))

def is_valid_internal_value(self, internal_value):
def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool:
return any(
True for scale in self.linear_methods if scale.is_valid_internal_value(internal_value))
24 changes: 19 additions & 5 deletions odxtools/compumethods/tabintpcompumethod.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import List, Tuple, Union

from ..exceptions import DecodeError, EncodeError, odxassert, odxraise
from ..odxtypes import DataType
from ..odxtypes import AtomicOdxType, DataType
from .compumethod import CompuMethod, CompuMethodCategory
from .limit import IntervalType, Limit

Expand Down Expand Up @@ -115,7 +115,11 @@ def _piecewise_linear_interpolate(self, x: Union[int, float],

return None

def convert_physical_to_internal(self, physical_value: Union[int, float]) -> Union[int, float]:
def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType:
if not isinstance(physical_value, (int, float)):
raise EncodeError("The type of values of tab-intp compumethods must "
"either int or float")

reference_points = list(zip(self.physical_points, self.internal_points))
result = self._piecewise_linear_interpolate(physical_value, reference_points)

Expand All @@ -127,7 +131,11 @@ def convert_physical_to_internal(self, physical_value: Union[int, float]) -> Uni
odxraise()
return res

def convert_internal_to_physical(self, internal_value: Union[int, float]) -> Union[int, float]:
def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType:
if not isinstance(internal_value, (int, float)):
raise EncodeError("The internal type of values of tab-intp compumethods must "
"either int or float")

reference_points = list(zip(self.internal_points, self.physical_points))
result = self._piecewise_linear_interpolate(internal_value, reference_points)

Expand All @@ -139,10 +147,16 @@ def convert_internal_to_physical(self, internal_value: Union[int, float]) -> Uni
odxraise()
return res

def is_valid_physical_value(self, physical_value: Union[int, float]) -> bool:
def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool:
if not isinstance(physical_value, (int, float)):
return False

return min(self.physical_points) <= physical_value and physical_value <= max(
self.physical_points)

def is_valid_internal_value(self, internal_value: Union[int, float]) -> bool:
def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool:
if not isinstance(internal_value, (int, float)):
return False

return min(self.internal_points) <= internal_value and internal_value <= max(
self.internal_points)
Loading