From a1da9cd6c1cc0b769cee700c4e719e057053b7a8 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Fri, 22 Sep 2023 19:54:27 +0200 Subject: [PATCH 01/13] minor fixes for README.md - update table of contents - remove duplicate License section Signed-off-by: Andreas Lauser Signed-off-by: Christian Hackenbeck Signed-off-by: Alexander Walz --- README.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/README.md b/README.md index bc146f8f..7ee870f9 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,7 @@ send to/received from ECUs in an pythonic manner. - [Installation](#installation) - [Usage Examples](#usage-examples) - [Python snippets](#python-snippets) +- [Using the non-strict mode](#using-the-non-strict-mode) - [Interactive Usage](#interactive-usage) - [Python REPL](#python-repl) - [Command line usage](#command-line-usage) @@ -609,10 +610,6 @@ project, please read the [contributing guide](https://github.com/mercedes-benz/o Please read our [Code of Conduct](https://github.com/mercedes-benz/daimler-foss/blob/master/CODE_OF_CONDUCT.md) as it is our base for interaction. -## License - -This project is licensed under the [MIT LICENSE](https://github.com/mercedes-benz/odxtools/blob/main/LICENSE). - ## Provider Information Please visit for information on the provider. From ee09da1c62095b69adb63a1d19ae14388d7aa84f Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Tue, 26 Sep 2023 11:11:47 +0200 Subject: [PATCH 02/13] Parameter: remove unused `get_coded_value()` method there's still `.get_coded_value_as_bytes(encode_state)`, but that one is most definitely used. Signed-off-by: Andreas Lauser Signed-off-by: Christian Hackenbeck Signed-off-by: Alexander Walz --- odxtools/parameters/codedconstparameter.py | 3 --- odxtools/parameters/dynamicparameter.py | 3 --- odxtools/parameters/matchingrequestparameter.py | 3 --- odxtools/parameters/nrcconstparameter.py | 3 --- odxtools/parameters/parameter.py | 4 ---- odxtools/parameters/parameterwithdop.py | 3 --- odxtools/parameters/physicalconstantparameter.py | 3 --- odxtools/parameters/reservedparameter.py | 3 --- odxtools/parameters/systemparameter.py | 3 --- odxtools/parameters/tableentryparameter.py | 3 --- odxtools/parameters/tablekeyparameter.py | 8 -------- odxtools/parameters/tablestructparameter.py | 4 ---- odxtools/parameters/valueparameter.py | 9 --------- 13 files changed, 52 deletions(-) diff --git a/odxtools/parameters/codedconstparameter.py b/odxtools/parameters/codedconstparameter.py index fa330c32..7122db10 100644 --- a/odxtools/parameters/codedconstparameter.py +++ b/odxtools/parameters/codedconstparameter.py @@ -55,9 +55,6 @@ def is_required(self) -> bool: def is_settable(self) -> bool: return False - def get_coded_value(self): - return self.coded_value - def get_coded_value_as_bytes(self, encode_state: EncodeState): if (self.short_name in encode_state.parameter_values and encode_state.parameter_values[self.short_name] != self.coded_value): diff --git a/odxtools/parameters/dynamicparameter.py b/odxtools/parameters/dynamicparameter.py index ace3e269..a51d6eab 100644 --- a/odxtools/parameters/dynamicparameter.py +++ b/odxtools/parameters/dynamicparameter.py @@ -19,9 +19,6 @@ def is_required(self) -> bool: def is_settable(self) -> bool: raise NotImplementedError(".is_settable for a DynamicParameter") - def get_coded_value(self): - raise NotImplementedError("Encoding a DynamicParameter is not implemented yet.") - def get_coded_value_as_bytes(self): raise NotImplementedError("Encoding a DynamicParameter is not implemented yet.") diff --git a/odxtools/parameters/matchingrequestparameter.py b/odxtools/parameters/matchingrequestparameter.py index 0b71c417..5e6a3fd2 100644 --- a/odxtools/parameters/matchingrequestparameter.py +++ b/odxtools/parameters/matchingrequestparameter.py @@ -28,9 +28,6 @@ def is_required(self) -> bool: def is_settable(self) -> bool: return False - def get_coded_value(self, request_value=None): - return request_value - def get_coded_value_as_bytes(self, encode_state: EncodeState): if not encode_state.triggering_request: raise EncodeError(f"Parameter '{self.short_name}' is of matching request type," diff --git a/odxtools/parameters/nrcconstparameter.py b/odxtools/parameters/nrcconstparameter.py index af6c1a1d..3aec7335 100644 --- a/odxtools/parameters/nrcconstparameter.py +++ b/odxtools/parameters/nrcconstparameter.py @@ -63,9 +63,6 @@ def is_required(self) -> bool: def is_settable(self) -> bool: return False - def get_coded_value(self): - return self.coded_value - def get_coded_value_as_bytes(self, encode_state: EncodeState): if self.short_name in encode_state.parameter_values: if encode_state.parameter_values[self.short_name] not in self.coded_values: diff --git a/odxtools/parameters/parameter.py b/odxtools/parameters/parameter.py index e6bb2d12..a5967ade 100644 --- a/odxtools/parameters/parameter.py +++ b/odxtools/parameters/parameter.py @@ -85,10 +85,6 @@ def is_settable(self) -> bool: """ raise NotImplementedError - @abc.abstractmethod - def get_coded_value(self): - pass - @abc.abstractmethod def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: """Get the coded value of the parameter given the encode state. diff --git a/odxtools/parameters/parameterwithdop.py b/odxtools/parameters/parameterwithdop.py index 5de836b4..1b32a5ee 100644 --- a/odxtools/parameters/parameterwithdop.py +++ b/odxtools/parameters/parameterwithdop.py @@ -73,9 +73,6 @@ def physical_type(self) -> Optional[PhysicalType]: else: return None - def get_coded_value(self, physical_value=None): - return self.dop.convert_physical_to_internal(physical_value) - def get_coded_value_as_bytes(self, encode_state: EncodeState): dop = odxrequire(self.dop, "Reference to DOP is not resolved") physical_value = encode_state.parameter_values[self.short_name] diff --git a/odxtools/parameters/physicalconstantparameter.py b/odxtools/parameters/physicalconstantparameter.py index 7d1eb8d6..43cb879e 100644 --- a/odxtools/parameters/physicalconstantparameter.py +++ b/odxtools/parameters/physicalconstantparameter.py @@ -52,9 +52,6 @@ def is_required(self) -> bool: def is_settable(self) -> bool: return False - def get_coded_value(self): - return self.dop.convert_physical_to_internal(self.physical_constant_value) - def get_coded_value_as_bytes(self, encode_state: EncodeState): dop = odxrequire(self.dop, "Reference to DOP is not resolved") if (self.short_name in encode_state.parameter_values and diff --git a/odxtools/parameters/reservedparameter.py b/odxtools/parameters/reservedparameter.py index a2f5b680..0caa8355 100644 --- a/odxtools/parameters/reservedparameter.py +++ b/odxtools/parameters/reservedparameter.py @@ -32,9 +32,6 @@ def bit_length(self) -> int: # need to take the "bit_length_raw" detour... return self.bit_length_raw - def get_coded_value(self): - return 0 - def get_coded_value_as_bytes(self, encode_state): bit_position_int = self.bit_position if self.bit_position is not None else 0 return (0).to_bytes((self.bit_length + bit_position_int + 7) // 8, "big") diff --git a/odxtools/parameters/systemparameter.py b/odxtools/parameters/systemparameter.py index 2f63b01e..aa08983a 100644 --- a/odxtools/parameters/systemparameter.py +++ b/odxtools/parameters/systemparameter.py @@ -21,9 +21,6 @@ def is_required(self) -> bool: def is_settable(self) -> bool: raise NotImplementedError("SystemParameter.is_settable is not implemented yet.") - def get_coded_value(self): - raise NotImplementedError("Encoding a SystemParameter is not implemented yet.") - def get_coded_value_as_bytes(self): raise NotImplementedError("Encoding a SystemParameter is not implemented yet.") diff --git a/odxtools/parameters/tableentryparameter.py b/odxtools/parameters/tableentryparameter.py index 017b3ad4..bd5487d5 100644 --- a/odxtools/parameters/tableentryparameter.py +++ b/odxtools/parameters/tableentryparameter.py @@ -22,9 +22,6 @@ def is_required(self) -> bool: def is_settable(self) -> bool: raise NotImplementedError("TableKeyParameter.is_settable is not implemented yet.") - def get_coded_value(self): - raise NotImplementedError("Encoding a TableKeyParameter is not implemented yet.") - def get_coded_value_as_bytes(self): raise NotImplementedError("Encoding a TableKeyParameter is not implemented yet.") diff --git a/odxtools/parameters/tablekeyparameter.py b/odxtools/parameters/tablekeyparameter.py index 4bcfc364..7d7f4d5d 100644 --- a/odxtools/parameters/tablekeyparameter.py +++ b/odxtools/parameters/tablekeyparameter.py @@ -94,14 +94,6 @@ def is_required(self) -> bool: def is_settable(self) -> bool: return True - def get_coded_value(self, physical_value=None) -> Any: - key_dop = self.table.key_dop - if key_dop is None: - raise EncodeError(f"Table '{self.table.short_name}' does not define " - f"a KEY-DOP, but is used in TABLE-KEY parameter " - f"'{self.short_name}'") - return key_dop.convert_physical_to_internal(physical_value) - def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: tr_short_name = encode_state.parameter_values.get(self.short_name) diff --git a/odxtools/parameters/tablestructparameter.py b/odxtools/parameters/tablestructparameter.py index d2a86366..5e41b00e 100644 --- a/odxtools/parameters/tablestructparameter.py +++ b/odxtools/parameters/tablestructparameter.py @@ -60,10 +60,6 @@ def is_required(self): def is_settable(self): return True - def get_coded_value(self, physical_value=None): - raise EncodeError("TableStructParameters cannot be converted to " - "internal values without a table row.") - def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: physical_value = encode_state.parameter_values.get(self.short_name) diff --git a/odxtools/parameters/valueparameter.py b/odxtools/parameters/valueparameter.py index 5e4841c9..785a6a9e 100644 --- a/odxtools/parameters/valueparameter.py +++ b/odxtools/parameters/valueparameter.py @@ -55,15 +55,6 @@ def is_required(self) -> bool: def is_settable(self) -> bool: return True - def get_coded_value(self, physical_value: Optional[AtomicOdxType] = None): - if physical_value is not None: - dop = odxrequire(self.dop) - if not isinstance(dop, DataObjectProperty): - odxraise() - return dop.convert_physical_to_internal(physical_value) - else: - return self.physical_default_value - def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: physical_value = encode_state.parameter_values.get(self.short_name, self.physical_default_value) From 20091091eddeef100bd6100c40969b97bcac60ea Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Tue, 19 Sep 2023 12:19:01 +0200 Subject: [PATCH 03/13] isotp_state_machine.py: more type annotations for whatever reason, mypy did not complain about these last time... Signed-off-by: Andreas Lauser Signed-off-by: Christian Hackenbeck Signed-off-by: Alexander Walz --- odxtools/isotp_state_machine.py | 70 ++++++++++++++++++++------------- 1 file changed, 42 insertions(+), 28 deletions(-) diff --git a/odxtools/isotp_state_machine.py b/odxtools/isotp_state_machine.py index 85ce7081..17fbc0bc 100644 --- a/odxtools/isotp_state_machine.py +++ b/odxtools/isotp_state_machine.py @@ -5,7 +5,7 @@ import re import sys from enum import IntEnum -from typing import Iterable, List, Optional, Tuple, Union +from typing import AsyncGenerator, Iterable, List, Optional, TextIO, Tuple, Union import bitstruct import can @@ -103,7 +103,8 @@ def decode_rx_frame(self, rx_id: int, data: bytes) -> Iterable[Tuple[int, bytes] else: self.on_frame_type_error(telegram_idx, frame_type) - async def read_telegrams(self, bus): + async def read_telegrams(self, bus: Union[can.Bus, + TextIO]) -> AsyncGenerator[Tuple[int, bytes], None]: """This is equivalent to the :py:meth:`file.readlines()` method, but it yields ISO-TP telegrams instead of lines. @@ -112,12 +113,12 @@ async def read_telegrams(self, bus): :param bus: Input file or socket of can bus to read the can frames """ - if hasattr(bus, "set_filters"): + if isinstance(bus, can.Bus): # this is a bit of a hack: passing any object which # exhibits a set_filters() method is assumed to be a can # bus object. - # creat an "on receive" event for the can bus + # create an "on receive" event for the can bus rx_event = asyncio.Event() loop = asyncio.get_running_loop() loop.add_reader(bus, rx_event.set) @@ -126,11 +127,13 @@ async def read_telegrams(self, bus): await rx_event.wait() msg = bus.recv() + if msg is None: + continue for tmp in self.decode_rx_frame(msg.arbitration_id, msg.data): yield tmp else: + assert isinstance(bus, TextIO) # input is a file - while bus: cur_line = bus.readline() if cur_line == "": @@ -165,14 +168,14 @@ async def read_telegrams(self, bus): f"Warning: unrecognized frame format: '{cur_line.strip()}'", file=sys.stderr) - def can_rx_id(self, telegram_idx): + def can_rx_id(self, telegram_idx: int) -> int: """Given a Telegram index, returns the CAN ID for receiving data. :raises IndexError: The telegram index is invalid. """ return self._can_rx_ids[telegram_idx] - def telegram_data(self, telegram_idx): + def telegram_data(self, telegram_idx: int) -> Optional[bytes]: """Given a Telegram index, returns the data received for this telegram so far. @@ -183,31 +186,32 @@ def telegram_data(self, telegram_idx): ############## # Callbacks ############## - def on_single_frame(self, telegram_idx, frame_payload): - """Method called when an ISO-TP message of type "single frame" has been received""" + def on_single_frame(self, telegram_idx: int, frame_payload: bytes) -> None: + """Callback method for when an ISO-TP message of type "single frame" has been received""" pass - def on_first_frame(self, telegram_idx, frame_payload): - """Method called when an ISO-TP message of type "first frame" has been received""" + def on_first_frame(self, telegram_idx: int, frame_payload: bytes) -> None: + """Callback method for when an ISO-TP message of type "first frame" has been received""" pass - def on_consecutive_frame(self, telegram_idx, segment_idx, frame_payload): - """Method called when an ISO-TP message of type "consecutive frame" has been received""" + def on_consecutive_frame(self, telegram_idx: int, segment_idx: int, + frame_payload: bytes) -> None: + """Callback method for when an ISO-TP message of type "consecutive frame" has been received""" pass - def on_flow_control_frame(self, telegram_idx, flow_control_flag): + def on_flow_control_frame(self, telegram_idx: int, flow_control_flag: int) -> None: """Method called when an ISO-TP message of type "flow control frame" has been received""" pass - def on_sequence_error(self, telegram_idx, expected_idx, rx_idx): + def on_sequence_error(self, telegram_idx: int, expected_idx: int, rx_idx: int) -> None: """Method called when a frame with an unexpected sequence index has been received""" pass - def on_frame_type_error(self, telegram_idx, frame_type): + def on_frame_type_error(self, telegram_idx: int, frame_type: int) -> None: """Method called when a frame exhibiting an unknown frame type has been received""" pass - def on_telegram_complete(self, telegram_idx, telegram_payload): + def on_telegram_complete(self, telegram_idx: int, telegram_payload: bytes) -> None: """Method called when an ISO-TP telegram has been fully received""" pass @@ -218,7 +222,12 @@ class IsoTpActiveDecoder(IsoTpStateMachine): receive ISO-TP messages actively instead of just snooping in on other people's conversations.""" - def __init__(self, can_bus, can_rx_ids, can_tx_ids, padding_size=0, padding_value=0xAA): + def __init__(self, + can_bus: can.Bus, + can_rx_ids: List[int], + can_tx_ids: List[int], + padding_size: int = 0, + padding_value: int = 0xAA): self._can_bus = can_bus if isinstance(can_tx_ids, int): @@ -230,14 +239,13 @@ def __init__(self, can_bus, can_rx_ids, can_tx_ids, padding_size=0, padding_valu super().__init__(can_rx_ids) - assert isinstance(self._can_tx_ids, list) assert len(self._can_rx_ids) == len(self._can_tx_ids) assert set(self._can_rx_ids).isdisjoint(set(self._can_tx_ids)) # correct? - self._block_size = [None] * len(self._can_rx_ids) - self._frames_received = [None] * len(self._can_rx_ids) + self._block_size: List[Optional[int]] = [None] * len(self._can_rx_ids) + self._frames_received: List[Optional[int]] = [None] * len(self._can_rx_ids) - def can_tx_id(self, telegram_idx): + def can_tx_id(self, telegram_idx: int) -> int: """Given a Telegram index, returns the CAN ID for sending data. :raises TypeError: No transmission IDs specified (e.g., @@ -246,7 +254,7 @@ def can_tx_id(self, telegram_idx): """ return self._can_tx_ids[telegram_idx] - def on_single_frame(self, telegram_idx, frame_payload): + def on_single_frame(self, telegram_idx: int, frame_payload: bytes) -> None: # send ACK #rx_id = self.can_rx_id(telegram_idx) tx_id = self.can_tx_id(telegram_idx) @@ -265,7 +273,7 @@ def on_single_frame(self, telegram_idx, frame_payload): super().on_first_frame(telegram_idx, frame_payload) - def on_first_frame(self, telegram_idx, frame_payload): + def on_first_frame(self, telegram_idx: int, frame_payload: bytes) -> None: # send ACK #rx_id = self.can_rx_id(telegram_idx) tx_id = self.can_tx_id(telegram_idx) @@ -287,12 +295,18 @@ def on_first_frame(self, telegram_idx, frame_payload): super().on_first_frame(telegram_idx, frame_payload) - def on_consecutive_frame(self, telegram_idx, segment_idx, frame_payload): - self._frames_received[telegram_idx] += 1 + def on_consecutive_frame(self, telegram_idx: int, segment_idx: int, + frame_payload: bytes) -> None: + num_received = self._frames_received[telegram_idx] + if num_received is None: + # consequtive frame received before a first frame. + # TODO (?): throw an exception + return + self._frames_received[telegram_idx] = num_received + 1 # send new ACK if necessary block_size = self._block_size[telegram_idx] - if self._frames_received[telegram_idx] >= block_size: + if block_size is not None and num_received >= block_size: #rx_id = self.can_rx_id(telegram_idx) tx_id = self.can_tx_id(telegram_idx) min_separation_time = 0 # ms @@ -309,7 +323,7 @@ def on_consecutive_frame(self, telegram_idx, segment_idx, frame_payload): super().on_consecutive_frame(telegram_idx, segment_idx, frame_payload) - def _send_can_message(self, can_tx_id, payload): + def _send_can_message(self, can_tx_id: int, payload: bytes) -> None: if len(payload) < self._padding_size: payload = bytes(payload) + bytes([self._padding_value] * (self._padding_size - len(payload))) From 482e13925641a03122dcba7efe209c2724aadbd0 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Tue, 19 Sep 2023 12:20:04 +0200 Subject: [PATCH 04/13] diagcodedtype.py: add minor comments Signed-off-by: Andreas Lauser Signed-off-by: Christian Hackenbeck Signed-off-by: Alexander Walz --- odxtools/diagcodedtype.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/odxtools/diagcodedtype.py b/odxtools/diagcodedtype.py index 50fee788..65a19f54 100644 --- a/odxtools/diagcodedtype.py +++ b/odxtools/diagcodedtype.py @@ -15,6 +15,7 @@ if TYPE_CHECKING: from .diaglayer import DiagLayer +# format specifiers for the data type using the bitstruct module ODX_TYPE_TO_FORMAT_LETTER = { DataType.A_INT32: "s", DataType.A_UINT32: "u", @@ -26,6 +27,7 @@ DataType.A_UTF8STRING: "t", } +# Allowed diag-coded types DctType = Literal[ "LEADING-LENGTH-INFO-TYPE", "MIN-MAX-LENGTH-TYPE", From 31d8a57d7dbc5049cda5676f2da8016d5ee07e9f Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Tue, 19 Sep 2023 12:21:11 +0200 Subject: [PATCH 05/13] PhysicalDimension: add type annotation to nested method Signed-off-by: Andreas Lauser Signed-off-by: Christian Hackenbeck Signed-off-by: Alexander Walz --- odxtools/physicaldimension.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/odxtools/physicaldimension.py b/odxtools/physicaldimension.py index 233a958b..6832c517 100644 --- a/odxtools/physicaldimension.py +++ b/odxtools/physicaldimension.py @@ -58,7 +58,7 @@ def from_et(et_element: ElementTree.Element, kwargs = dataclass_fields_asdict(IdentifiableElement.from_et(et_element, doc_frags)) oid = et_element.get("OID") - def read_optional_int(element, name): + def read_optional_int(element: ElementTree.Element, name: str) -> int: if val_str := element.findtext(name): return int(val_str) else: From b3840dd8cc66e9df5f3a79f95076518778aa2ce7 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Tue, 19 Sep 2023 12:22:08 +0200 Subject: [PATCH 06/13] Modification: add missing type annotations Signed-off-by: Andreas Lauser Signed-off-by: Christian Hackenbeck Signed-off-by: Alexander Walz --- odxtools/modification.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/odxtools/modification.py b/odxtools/modification.py index 997bcf9f..dc2a673d 100644 --- a/odxtools/modification.py +++ b/odxtools/modification.py @@ -24,8 +24,8 @@ def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) -> def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: return {} - def _resolve_odxlinks(self, odxlinks: OdxLinkDatabase): + def _resolve_odxlinks(self, odxlinks: OdxLinkDatabase) -> None: pass - def _resolve_snrefs(self, diag_layer: "DiagLayer"): + def _resolve_snrefs(self, diag_layer: "DiagLayer") -> None: pass From 48f13a14a0c37f109b7e54a629a485d5fce9f3c4 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Tue, 19 Sep 2023 12:25:28 +0200 Subject: [PATCH 07/13] Unit: add/fix type annotations Signed-off-by: Andreas Lauser Signed-off-by: Christian Hackenbeck Signed-off-by: Alexander Walz --- odxtools/unit.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/odxtools/unit.py b/odxtools/unit.py index 83b45bac..38f058df 100644 --- a/odxtools/unit.py +++ b/odxtools/unit.py @@ -60,7 +60,7 @@ class Unit(IdentifiableElement): offset_si_to_unit: Optional[float] physical_dimension_ref: Optional[OdxLinkRef] - def __post_init__(self): + def __post_init__(self) -> None: self._physical_dimension = None @staticmethod @@ -69,9 +69,9 @@ def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) -> oid = et_element.get("OID") display_name = odxrequire(et_element.findtext("DISPLAY-NAME")) - def read_optional_float(element, name): - if element.findtext(name): - return float(element.findtext(name)) + def read_optional_float(element: ElementTree.Element, name: str) -> Optional[float]: + if (elem_str := element.findtext(name)) is not None: + return float(elem_str) else: return None @@ -89,7 +89,7 @@ def read_optional_float(element, name): **kwargs) @property - def physical_dimension(self) -> PhysicalDimension: + def physical_dimension(self) -> Optional[PhysicalDimension]: return self._physical_dimension def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: From f591b3bee298d87a8c174eeb38769a323a6d9f24 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Tue, 19 Sep 2023 12:27:23 +0200 Subject: [PATCH 08/13] PhysicalType: add/fix type annotations Signed-off-by: Andreas Lauser Signed-off-by: Christian Hackenbeck Signed-off-by: Alexander Walz --- odxtools/physicaltype.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/odxtools/physicaltype.py b/odxtools/physicaltype.py index b937c292..6cb8ecef 100644 --- a/odxtools/physicaltype.py +++ b/odxtools/physicaltype.py @@ -51,13 +51,13 @@ class PhysicalType: The precision is only applicable if the base data type is A_FLOAT32 or A_FLOAT64. """ - def __post_init__(self): + def __post_init__(self) -> None: self.base_data_type = DataType(self.base_data_type) if self.display_radix is not None: self.display_radix = Radix(self.display_radix) @staticmethod - def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]): + def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) -> "PhysicalType": base_data_type_str = et_element.get("BASE-DATA-TYPE") if base_data_type_str not in DataType.__members__: odxraise(f"Encountered unknown base data type '{base_data_type_str}'") From 20c8bf0d3b1320d0b43ef9f13d284c0d414148c3 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Tue, 19 Sep 2023 12:32:01 +0200 Subject: [PATCH 09/13] Limit: add/fix type annotations Signed-off-by: Andreas Lauser Signed-off-by: Christian Hackenbeck Signed-off-by: Alexander Walz --- odxtools/compumethods/limit.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/odxtools/compumethods/limit.py b/odxtools/compumethods/limit.py index 085c444a..18182942 100644 --- a/odxtools/compumethods/limit.py +++ b/odxtools/compumethods/limit.py @@ -1,11 +1,11 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass from enum import Enum -from typing import Optional, Union +from typing import Optional from xml.etree import ElementTree from ..exceptions import odxassert, odxraise, odxrequire -from ..odxtypes import DataType +from ..odxtypes import AtomicOdxType, DataType class IntervalType(Enum): @@ -16,7 +16,7 @@ class IntervalType(Enum): @dataclass class Limit: - value: Union[str, int, float, bytes] + value: AtomicOdxType interval_type: IntervalType = IntervalType.CLOSED def __post_init__(self) -> None: @@ -53,7 +53,7 @@ def from_et(et_element: Optional[ElementTree.Element], *, else: return Limit(internal_type.from_string(odxrequire(et_element.text)), interval_type) - def complies_to_upper(self, value): + def complies_to_upper(self, value: AtomicOdxType) -> bool: """Checks if the value is in the range w.r.t. the upper limit. * If the interval type is closed, return `value <= limit.value`. @@ -61,13 +61,13 @@ def complies_to_upper(self, value): * If the interval type is infinite, return `True`. """ if self.interval_type == IntervalType.CLOSED: - return value <= self.value + return value <= self.value # type: ignore[operator] elif self.interval_type == IntervalType.OPEN: - return value < self.value + return value < self.value # type: ignore[operator] elif self.interval_type == IntervalType.INFINITE: return True - def complies_to_lower(self, value): + def complies_to_lower(self, value: AtomicOdxType) -> bool: """Checks if the value is in the range w.r.t. the lower limit. * If the interval type is closed, return `limit.value <= value`. @@ -75,8 +75,8 @@ def complies_to_lower(self, value): * If the interval type is infinite, return `True`. """ if self.interval_type == IntervalType.CLOSED: - return self.value <= value + return self.value <= value # type: ignore[operator] elif self.interval_type == IntervalType.OPEN: - return self.value < value + return self.value < value # type: ignore[operator] elif self.interval_type == IntervalType.INFINITE: return True From 3bc3f6048c1f4dc0e45949651326fa33e13c7593 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Tue, 19 Sep 2023 12:36:54 +0200 Subject: [PATCH 10/13] CompuMethod: add/fix type annotations Signed-off-by: Andreas Lauser Signed-off-by: Christian Hackenbeck Signed-off-by: Alexander Walz --- odxtools/compumethods/compumethod.py | 14 ++++++------ odxtools/compumethods/tabintpcompumethod.py | 24 ++++++++++++++++----- odxtools/parameterinfo.py | 4 ++-- 3 files changed, 28 insertions(+), 14 deletions(-) diff --git a/odxtools/compumethods/compumethod.py b/odxtools/compumethods/compumethod.py index 9df65560..9abbe060 100644 --- a/odxtools/compumethods/compumethod.py +++ b/odxtools/compumethods/compumethod.py @@ -1,9 +1,9 @@ # SPDX-License-Identifier: MIT import abc from dataclasses import dataclass -from typing import Literal, Union +from typing import List, Literal, Optional -from ..odxtypes import DataType +from ..odxtypes import AtomicOdxType, DataType CompuMethodCategory = Literal[ "IDENTICAL", @@ -24,17 +24,17 @@ class CompuMethod(abc.ABC): def category(self) -> CompuMethodCategory: pass - def convert_physical_to_internal(self, physical_value): + def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType: raise NotImplementedError() - def convert_internal_to_physical(self, internal_value) -> Union[int, float, str]: + def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType: raise NotImplementedError() - def is_valid_physical_value(self, physical_value): + def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool: raise NotImplementedError() - def is_valid_internal_value(self, internal_value): + def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool: raise NotImplementedError() - def get_valid_physical_values(self): + def get_valid_physical_values(self) -> Optional[List[DataType]]: return None diff --git a/odxtools/compumethods/tabintpcompumethod.py b/odxtools/compumethods/tabintpcompumethod.py index 0c6c2299..6be2b80b 100644 --- a/odxtools/compumethods/tabintpcompumethod.py +++ b/odxtools/compumethods/tabintpcompumethod.py @@ -3,7 +3,7 @@ from typing import List, Tuple, Union from ..exceptions import DecodeError, EncodeError, odxassert, odxraise -from ..odxtypes import DataType +from ..odxtypes import AtomicOdxType, DataType from .compumethod import CompuMethod, CompuMethodCategory from .limit import IntervalType, Limit @@ -115,7 +115,11 @@ def _piecewise_linear_interpolate(self, x: Union[int, float], return None - def convert_physical_to_internal(self, physical_value: Union[int, float]) -> Union[int, float]: + def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType: + if not isinstance(physical_value, (int, float)): + raise EncodeError("The type of values of tab-intp compumethods must " + "either int or float") + reference_points = list(zip(self.physical_points, self.internal_points)) result = self._piecewise_linear_interpolate(physical_value, reference_points) @@ -127,7 +131,11 @@ def convert_physical_to_internal(self, physical_value: Union[int, float]) -> Uni odxraise() return res - def convert_internal_to_physical(self, internal_value: Union[int, float]) -> Union[int, float]: + def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType: + if not isinstance(internal_value, (int, float)): + raise EncodeError("The internal type of values of tab-intp compumethods must " + "either int or float") + reference_points = list(zip(self.internal_points, self.physical_points)) result = self._piecewise_linear_interpolate(internal_value, reference_points) @@ -139,10 +147,16 @@ def convert_internal_to_physical(self, internal_value: Union[int, float]) -> Uni odxraise() return res - def is_valid_physical_value(self, physical_value: Union[int, float]) -> bool: + def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool: + if not isinstance(physical_value, (int, float)): + return False + return min(self.physical_points) <= physical_value and physical_value <= max( self.physical_points) - def is_valid_internal_value(self, internal_value: Union[int, float]) -> bool: + def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool: + if not isinstance(internal_value, (int, float)): + return False + return min(self.internal_points) <= internal_value and internal_value <= max( self.internal_points) diff --git a/odxtools/parameterinfo.py b/odxtools/parameterinfo.py index b3fe8575..210e400d 100644 --- a/odxtools/parameterinfo.py +++ b/odxtools/parameterinfo.py @@ -86,8 +86,8 @@ def parameter_info(param_list: Iterable[Union[Parameter, EndOfPduField]]) -> str ul = cm.physical_upper_limit result += (f" range: " f"{'[' if ll.interval_type == IntervalType.CLOSED else '('}" - f"{ll.value}, " - f"{ul.value}" + f"{ll.value!r}, " + f"{ul.value!r}" f"{']' if ul.interval_type == IntervalType.CLOSED else ')'}\n") unit = dop.unit From 1b4b4b076cf4088755b6fcfba482843a2f63ca27 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Wed, 27 Sep 2023 13:40:25 +0200 Subject: [PATCH 11/13] compu methods: add/fix type annotations Signed-off-by: Andreas Lauser Signed-off-by: Christian Hackenbeck Signed-off-by: Alexander Walz --- odxtools/compumethods/compumethod.py | 5 +-- odxtools/compumethods/createanycompumethod.py | 14 +++---- odxtools/compumethods/identicalcompumethod.py | 9 ++-- odxtools/compumethods/linearcompumethod.py | 42 +++++++++++-------- .../compumethods/scalelinearcompumethod.py | 11 ++--- odxtools/compumethods/texttablecompumethod.py | 41 ++++++++++-------- odxtools/dataobjectproperty.py | 5 +-- 7 files changed, 68 insertions(+), 59 deletions(-) diff --git a/odxtools/compumethods/compumethod.py b/odxtools/compumethods/compumethod.py index 9abbe060..5a43fa81 100644 --- a/odxtools/compumethods/compumethod.py +++ b/odxtools/compumethods/compumethod.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: MIT import abc from dataclasses import dataclass -from typing import List, Literal, Optional +from typing import Literal from ..odxtypes import AtomicOdxType, DataType @@ -35,6 +35,3 @@ def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool: def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool: raise NotImplementedError() - - def get_valid_physical_values(self) -> Optional[List[DataType]]: - return None diff --git a/odxtools/compumethods/createanycompumethod.py b/odxtools/compumethods/createanycompumethod.py index 51d4d275..e4e9ffa5 100644 --- a/odxtools/compumethods/createanycompumethod.py +++ b/odxtools/compumethods/createanycompumethod.py @@ -19,12 +19,12 @@ def _parse_compu_scale_to_linear_compu_method( *, - scale_element, + scale_element: ElementTree.Element, internal_type: DataType, physical_type: DataType, - is_scale_linear=False, - **kwargs, -): + is_scale_linear: bool = False, + **kwargs: Any, +) -> LinearCompuMethod: odxassert(physical_type in [ DataType.A_FLOAT32, DataType.A_FLOAT64, @@ -47,12 +47,12 @@ def _parse_compu_scale_to_linear_compu_method( kwargs["internal_type"] = internal_type kwargs["physical_type"] = physical_type - coeffs = scale_element.find("COMPU-RATIONAL-COEFFS") + coeffs = odxrequire(scale_element.find("COMPU-RATIONAL-COEFFS")) nums = coeffs.iterfind("COMPU-NUMERATOR/V") - offset = computation_python_type(next(nums).text) + offset = computation_python_type(odxrequire(next(nums).text)) factor_el = next(nums, None) - factor = computation_python_type(factor_el.text if factor_el is not None else "0") + factor = computation_python_type(odxrequire(factor_el.text) if factor_el is not None else "0") denominator = 1.0 if (string := coeffs.findtext("COMPU-DENOMINATOR/V")) is not None: denominator = float(string) diff --git a/odxtools/compumethods/identicalcompumethod.py b/odxtools/compumethods/identicalcompumethod.py index 46a57d56..08eb86b7 100644 --- a/odxtools/compumethods/identicalcompumethod.py +++ b/odxtools/compumethods/identicalcompumethod.py @@ -1,6 +1,7 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass +from ..odxtypes import AtomicOdxType from .compumethod import CompuMethod, CompuMethodCategory @@ -11,14 +12,14 @@ class IdenticalCompuMethod(CompuMethod): def category(self) -> CompuMethodCategory: return "IDENTICAL" - def convert_physical_to_internal(self, physical_value): + def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType: return physical_value - def convert_internal_to_physical(self, internal_value): + def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType: return internal_value - def is_valid_physical_value(self, physical_value): + def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool: return self.physical_type.isinstance(physical_value) - def is_valid_internal_value(self, internal_value): + def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool: return self.internal_type.isinstance(internal_value) diff --git a/odxtools/compumethods/linearcompumethod.py b/odxtools/compumethods/linearcompumethod.py index 4e00e1d1..e83518cc 100644 --- a/odxtools/compumethods/linearcompumethod.py +++ b/odxtools/compumethods/linearcompumethod.py @@ -1,9 +1,9 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import Union +from typing import cast -from ..exceptions import odxassert -from ..odxtypes import DataType +from ..exceptions import DecodeError, EncodeError, odxassert +from ..odxtypes import AtomicOdxType, DataType from .compumethod import CompuMethod, CompuMethodCategory from .limit import IntervalType, Limit @@ -73,20 +73,20 @@ def category(self) -> CompuMethodCategory: return "LINEAR" @property - def physical_lower_limit(self): + def physical_lower_limit(self) -> Limit: return self._physical_lower_limit @property - def physical_upper_limit(self): + def physical_upper_limit(self) -> Limit: return self._physical_upper_limit - def __compute_physical_limits(self): + def __compute_physical_limits(self) -> None: """Computes the physical limits and stores them in the properties self._physical_lower_limit and self._physical_upper_limit. This method is only called during the initialization of a LinearCompuMethod. """ - def convert_to_limit_to_physical(limit: Limit, is_upper_limit: bool): + def convert_to_limit_to_physical(limit: Limit, is_upper_limit: bool) -> Limit: """Helper method Parameters: @@ -127,10 +127,14 @@ def convert_to_limit_to_physical(limit: Limit, is_upper_limit: bool): if self.physical_type == DataType.A_UINT32: # If the data type is unsigned, the physical lower limit should be at least 0. if (self._physical_lower_limit.interval_type == IntervalType.INFINITE or - self._physical_lower_limit.value < 0): + cast(float, self._physical_lower_limit.value) < 0): self._physical_lower_limit = Limit(value=0, interval_type=IntervalType.CLOSED) - def _convert_internal_to_physical(self, internal_value): + def _convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType: + if not isinstance(internal_value, (int, float)): + raise DecodeError("The type of internal values of linear compumethods must " + "either int or float") + if self.denominator is None: result = self.offset + self.factor * internal_value else: @@ -143,11 +147,15 @@ def _convert_internal_to_physical(self, internal_value): result = round(result) return self.physical_type.make_from(result) - def convert_internal_to_physical(self, internal_value) -> Union[int, float]: + def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType: odxassert(self.is_valid_internal_value(internal_value)) return self._convert_internal_to_physical(internal_value) - def convert_physical_to_internal(self, physical_value): + def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType: + if not isinstance(physical_value, (int, float)): + raise EncodeError("The type of physical values of linear compumethods must " + "either int or float") + odxassert( self.is_valid_physical_value(physical_value), f"physical value {physical_value} of type {type(physical_value)} " @@ -165,12 +173,12 @@ def convert_physical_to_internal(self, physical_value): result = round(result) return self.internal_type.make_from(result) - def is_valid_physical_value(self, physical_value): + def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool: # Do type checks expected_type = self.physical_type.as_python_type() - if expected_type == float and type(physical_value) not in [int, float]: + if expected_type == float and not isinstance(physical_value, (int, float)): return False - elif expected_type != float and type(physical_value) != expected_type: + elif expected_type != float and not isinstance(physical_value, expected_type): return False # Compare to the limits @@ -180,11 +188,11 @@ def is_valid_physical_value(self, physical_value): return False return True - def is_valid_internal_value(self, internal_value): + def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool: expected_type = self.internal_type.as_python_type() - if expected_type == float and type(internal_value) not in [int, float]: + if expected_type == float and not isinstance(internal_value, (int, float)): return False - elif expected_type != float and type(internal_value) != expected_type: + elif expected_type != float and not isinstance(internal_value, expected_type): return False if not self.internal_lower_limit.complies_to_lower(internal_value): diff --git a/odxtools/compumethods/scalelinearcompumethod.py b/odxtools/compumethods/scalelinearcompumethod.py index dac60dc3..ba726e70 100644 --- a/odxtools/compumethods/scalelinearcompumethod.py +++ b/odxtools/compumethods/scalelinearcompumethod.py @@ -3,6 +3,7 @@ from typing import List from ..exceptions import odxassert +from ..odxtypes import AtomicOdxType from .compumethod import CompuMethod, CompuMethodCategory from .linearcompumethod import LinearCompuMethod @@ -15,24 +16,24 @@ class ScaleLinearCompuMethod(CompuMethod): def category(self) -> CompuMethodCategory: return "SCALE-LINEAR" - def convert_physical_to_internal(self, physical_value): + def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType: odxassert( self.is_valid_physical_value(physical_value), - f"cannot convert the invalid physical value {physical_value} " + f"cannot convert the invalid physical value {physical_value!r} " f"of type {type(physical_value)}") lin_method = next( scale for scale in self.linear_methods if scale.is_valid_physical_value(physical_value)) return lin_method.convert_physical_to_internal(physical_value) - def convert_internal_to_physical(self, internal_value): + def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType: lin_method = next( scale for scale in self.linear_methods if scale.is_valid_internal_value(internal_value)) return lin_method.convert_internal_to_physical(internal_value) - def is_valid_physical_value(self, physical_value): + def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool: return any( True for scale in self.linear_methods if scale.is_valid_physical_value(physical_value)) - def is_valid_internal_value(self, internal_value): + def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool: return any( True for scale in self.linear_methods if scale.is_valid_internal_value(internal_value)) diff --git a/odxtools/compumethods/texttablecompumethod.py b/odxtools/compumethods/texttablecompumethod.py index 60025cf1..d22ea1fb 100644 --- a/odxtools/compumethods/texttablecompumethod.py +++ b/odxtools/compumethods/texttablecompumethod.py @@ -3,7 +3,7 @@ from typing import List, Optional from ..exceptions import DecodeError, EncodeError, odxassert -from ..odxtypes import DataType +from ..odxtypes import AtomicOdxType, DataType from .compumethod import CompuMethod, CompuMethodCategory from .compuscale import CompuScale @@ -28,25 +28,30 @@ def __post_init__(self) -> None: def category(self) -> CompuMethodCategory: return "TEXTTABLE" - def _get_scales(self): + def _get_scales(self) -> List[CompuScale]: scales = list(self.internal_to_phys) if self.compu_default_value: # Default is last, since it's a fallback scales.append(self.compu_default_value) return scales - def convert_physical_to_internal(self, physical_value): - scale: CompuScale = next( - filter(lambda scale: scale.compu_const == physical_value, self._get_scales()), None) - if scale is not None: - res = ( - scale.compu_inverse_value - if scale.compu_inverse_value is not None else scale.lower_limit.value) + def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType: + scales = [x for x in self._get_scales() if x.compu_const == physical_value] + if scales: + inverse_value: Optional[AtomicOdxType] = scales[0].compu_inverse_value + if inverse_value is not None: + res = inverse_value + elif scales[0].lower_limit is not None: + res = scales[0].lower_limit.value + else: + res = None + odxassert(self.internal_type.isinstance(res)) - return res - raise EncodeError(f"Texttable compu method could not encode '{physical_value}'.") + return res # type: ignore[return-value] + + raise EncodeError(f"Texttable compu method could not encode '{physical_value!r}'.") - def __is_internal_in_scale(self, internal_value, scale: CompuScale): + def __is_internal_in_scale(self, internal_value: AtomicOdxType, scale: CompuScale) -> bool: if scale == self.compu_default_value: return True if scale.lower_limit is not None and not scale.lower_limit.complies_to_lower( @@ -60,23 +65,23 @@ def __is_internal_in_scale(self, internal_value, scale: CompuScale): # value complies to the defined limits return True - def convert_internal_to_physical(self, internal_value): + def convert_internal_to_physical(self, internal_value: AtomicOdxType) -> AtomicOdxType: scale = next( filter( lambda scale: self.__is_internal_in_scale(internal_value, scale), self._get_scales(), ), None) - if scale is None: + if scale is None or scale.compu_const is None: raise DecodeError( - f"Texttable compu method could not decode {internal_value} to string.") + f"Texttable compu method could not decode {internal_value!r} to string.") return scale.compu_const - def is_valid_physical_value(self, physical_value): + def is_valid_physical_value(self, physical_value: AtomicOdxType) -> bool: return physical_value in self.get_valid_physical_values() - def is_valid_internal_value(self, internal_value): + def is_valid_internal_value(self, internal_value: AtomicOdxType) -> bool: return any( self.__is_internal_in_scale(internal_value, scale) for scale in self._get_scales()) - def get_valid_physical_values(self): + def get_valid_physical_values(self) -> List[Optional[AtomicOdxType]]: return [x.compu_const for x in self._get_scales()] diff --git a/odxtools/dataobjectproperty.py b/odxtools/dataobjectproperty.py index 59665605..0ac9b1b9 100644 --- a/odxtools/dataobjectproperty.py +++ b/odxtools/dataobjectproperty.py @@ -145,10 +145,7 @@ def convert_physical_to_bytes(self, physical_value: Any, encode_state: EncodeSta """ if not self.is_valid_physical_value(physical_value): raise EncodeError(f"The value {repr(physical_value)} of type {type(physical_value)}" - f" is not a valid." + - (f" Valid values are {self.compu_method.get_valid_physical_values()}" - if self.compu_method.get_valid_physical_values( - ) else f" Expected type {self.physical_type.base_data_type.value}.")) + f" is not a valid.") internal_val = self.convert_physical_to_internal(physical_value) return self.diag_coded_type.convert_internal_to_bytes( From c661c3149ce1594110048c6eec362a010d8eb0d3 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Wed, 27 Sep 2023 16:38:08 +0200 Subject: [PATCH 12/13] README.md: add a few status badges thanks to [at]kayoub5 for the suggestion! Signed-off-by: Andreas Lauser Signed-off-by: Christian Hackenbeck Signed-off-by: Alexander Walz --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 7ee870f9..7ac91d25 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,8 @@ +[![PyPi - Version](https://img.shields.io/pypi/v/odxtools)](https://pypi.org/project/odxtools) +[![PyPI - License](https://img.shields.io/pypi/l/odxtools)](LICENSE) +[![CI Status](https://github.com/mercedes-benz/odxtools/actions/workflows/tests.yml/badge.svg?branch=main)](https://github.com/mercedes-benz/odxtools/actions?query=branch%3Amain) + # odxtools `odxtools` is a set of utilities for working with diagnostic From cf061ff96f33bbb91ad22b496639ef1ee8c3f815 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Wed, 27 Sep 2023 16:45:03 +0200 Subject: [PATCH 13/13] consider review comments As usual the thanks go to [at]kayoub5! Signed-off-by: Andreas Lauser Signed-off-by: Christian Hackenbeck Signed-off-by: Alexander Walz --- odxtools/compumethods/texttablecompumethod.py | 20 ++++++++----------- odxtools/isotp_state_machine.py | 4 ---- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/odxtools/compumethods/texttablecompumethod.py b/odxtools/compumethods/texttablecompumethod.py index d22ea1fb..7df89040 100644 --- a/odxtools/compumethods/texttablecompumethod.py +++ b/odxtools/compumethods/texttablecompumethod.py @@ -36,18 +36,14 @@ def _get_scales(self) -> List[CompuScale]: return scales def convert_physical_to_internal(self, physical_value: AtomicOdxType) -> AtomicOdxType: - scales = [x for x in self._get_scales() if x.compu_const == physical_value] - if scales: - inverse_value: Optional[AtomicOdxType] = scales[0].compu_inverse_value - if inverse_value is not None: - res = inverse_value - elif scales[0].lower_limit is not None: - res = scales[0].lower_limit.value - else: - res = None - - odxassert(self.internal_type.isinstance(res)) - return res # type: ignore[return-value] + matching_scales = [x for x in self._get_scales() if x.compu_const == physical_value] + for scale in matching_scales: + if scale.compu_inverse_value is not None: + return scale.compu_inverse_value + elif scale.lower_limit is not None: + return scale.lower_limit.value + elif scale.upper_limit is not None: + return scale.upper_limit.value raise EncodeError(f"Texttable compu method could not encode '{physical_value!r}'.") diff --git a/odxtools/isotp_state_machine.py b/odxtools/isotp_state_machine.py index 17fbc0bc..43efe3a2 100644 --- a/odxtools/isotp_state_machine.py +++ b/odxtools/isotp_state_machine.py @@ -114,10 +114,6 @@ async def read_telegrams(self, bus: Union[can.Bus, """ if isinstance(bus, can.Bus): - # this is a bit of a hack: passing any object which - # exhibits a set_filters() method is assumed to be a can - # bus object. - # create an "on receive" event for the can bus rx_event = asyncio.Event() loop = asyncio.get_running_loop()