diff --git a/examples/somersault.pdx b/examples/somersault.pdx index a17710ae..874e58c8 100644 Binary files a/examples/somersault.pdx and b/examples/somersault.pdx differ diff --git a/examples/somersaultecu.py b/examples/somersaultecu.py index 831398b6..a94721b1 100755 --- a/examples/somersaultecu.py +++ b/examples/somersaultecu.py @@ -713,6 +713,18 @@ class SomersaultSID(IntEnum): bit_position=None, sdgs=[], ), + ValueParameter( + short_name="sault_time", + long_name=None, + semantic=None, + description=None, + physical_default_value_raw="255", + byte_position=2, + dop_ref=OdxLinkRef("somersault.DOP.duration", doc_frags), + dop_snref=None, + bit_position=None, + sdgs=[], + ), ]), byte_size=None, ), diff --git a/odxtools/basicstructure.py b/odxtools/basicstructure.py index 3acfc503..02e9b102 100644 --- a/odxtools/basicstructure.py +++ b/odxtools/basicstructure.py @@ -8,7 +8,7 @@ from .dataobjectproperty import DataObjectProperty from .decodestate import DecodeState from .encodestate import EncodeState -from .exceptions import DecodeError, EncodeError, OdxWarning, odxassert, odxraise +from .exceptions import DecodeError, EncodeError, OdxWarning, odxassert, odxraise, strict_mode from .nameditemlist import NamedItemList from .odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId from .odxtypes import ParameterDict, ParameterValue, ParameterValueDict @@ -74,10 +74,10 @@ def get_static_bit_length(self) -> Optional[int]: def coded_const_prefix(self, request_prefix: bytes = b'') -> bytes: prefix = b'' encode_state = EncodeState(prefix, parameter_values={}, triggering_request=request_prefix) - for p in self.parameters: - if isinstance(p, (CodedConstParameter, NrcConstParameter, MatchingRequestParameter, - PhysicalConstantParameter)): - encode_state.coded_message = p.encode_into_pdu(encode_state) + for param in self.parameters: + if isinstance(param, (CodedConstParameter, NrcConstParameter, MatchingRequestParameter, + PhysicalConstantParameter)): + encode_state.coded_message = param.encode_into_pdu(encode_state) else: break return encode_state.coded_message @@ -124,6 +124,13 @@ def convert_physical_to_internal(self, f"Expected a dictionary for the values of structure {self.short_name}, " f"got {type(param_value)}") + # in strict mode, ensure that no values for unknown parameters are specified. + if strict_mode: + param_names = [param.short_name for param in self.parameters] + for param_key in param_value: + if param_key not in param_names: + odxraise(f"Value for unknown parameter '{param_key}' specified") + encode_state = EncodeState( b'', dict(param_value), @@ -139,6 +146,21 @@ def convert_physical_to_internal(self, # the ODX is located last in the PDU... encode_state.is_end_of_pdu = is_end_of_pdu + if isinstance( + param, + (LengthKeyParameter, TableKeyParameter)) and param.short_name in param_value: + # This is a hack to always encode a dummy value for + # length- and table keys. since these can be specified + # implicitly (i.e., by means of parameters that use + # these keys), to avoid getting an "overlapping + # parameter" warning, we must encode a value of zero + # into the PDU here and add the real value of the + # parameter in a post processing step. + tmp = encode_state.parameter_values.pop(param.short_name) + encode_state.coded_message = param.encode_into_pdu(encode_state) + encode_state.parameter_values[param.short_name] = tmp + continue + encode_state.coded_message = param.encode_into_pdu(encode_state) if self.byte_size is not None and len(encode_state.coded_message) < self.byte_size: @@ -213,10 +235,10 @@ def convert_bytes_to_physical(self, inner_decode_state = DecodeState( coded_message=byte_code, parameter_values={}, cursor_position=0) - for parameter in self.parameters: - value, cursor_position = parameter.decode_from_pdu(inner_decode_state) + for param in self.parameters: + value, cursor_position = param.decode_from_pdu(inner_decode_state) - inner_decode_state.parameter_values[parameter.short_name] = value + inner_decode_state.parameter_values[param.short_name] = value inner_decode_state = DecodeState( coded_message=byte_code, parameter_values=inner_decode_state.parameter_values, @@ -282,8 +304,8 @@ def parameter_dict(self) -> ParameterDict: def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: result = super()._build_odxlinks() - for p in self.parameters: - result.update(p._build_odxlinks()) + for param in self.parameters: + result.update(param._build_odxlinks()) return result @@ -291,12 +313,12 @@ def _resolve_odxlinks(self, odxlinks: OdxLinkDatabase) -> None: """Recursively resolve any references (odxlinks or sn-refs)""" super()._resolve_odxlinks(odxlinks) - for p in self.parameters: - p._resolve_odxlinks(odxlinks) + for param in self.parameters: + param._resolve_odxlinks(odxlinks) def _resolve_snrefs(self, diag_layer: "DiagLayer") -> None: """Recursively resolve any references (odxlinks or sn-refs)""" super()._resolve_snrefs(diag_layer) - for p in self.parameters: - p._resolve_snrefs(diag_layer) + for param in self.parameters: + param._resolve_snrefs(diag_layer) diff --git a/odxtools/cli/snoop.py b/odxtools/cli/snoop.py index e725740c..19875593 100644 --- a/odxtools/cli/snoop.py +++ b/odxtools/cli/snoop.py @@ -12,6 +12,7 @@ import odxtools.uds as uds from odxtools.exceptions import DecodeError from odxtools.isotp_state_machine import IsoTpStateMachine +from odxtools.response import Response, ResponseType from . import _parser_utils @@ -31,7 +32,7 @@ def handle_telegram(telegram_id: int, payload: bytes) -> None: if telegram_id == ecu_tx_id: if uds.is_response_pending(payload): - print(f" -> ECU: ... (response pending)") + print(f" ... (response pending)") return decoded_message = None @@ -48,21 +49,29 @@ def handle_telegram(telegram_id: int, payload: bytes) -> None: if len(decoded_message) > 1: dec_str = f" (decoding {i+1})" - response_type = getattr(resp.coding_object, "response_type", None) rt_str = "unknown" - if response_type == "POS-RESPONSE": - rt_str = "positive" - elif response_type == "NEG-RESPONSE": - rt_str = "negative" - - print(f" -> {rt_str} ECU response{dec_str}: \"{resp.coding_object.short_name}\"") + if isinstance(resp.coding_object, Response): + if resp.coding_object.response_type == ResponseType.POSITIVE: + rt_str = "positive" + elif resp.coding_object.response_type in (ResponseType.NEGATIVE, + ResponseType.GLOBAL_NEGATIVE): + rt_str = "negative" + + settable_params = [] for param_name, param_val in resp.param_dict.items(): param = [x for x in params if x.short_name == param_name][0] if not param.is_settable: continue - print(f" {param_name} = {repr(param_val)}") + settable_params.append((param_name, param_val)) + + if settable_params: + print(f" {rt_str} response{dec_str} {resp.coding_object.short_name}:") + for param_name, param_val in settable_params: + print(f" {param_name} = {repr(param_val)}") + else: + print(f" {rt_str} response{dec_str} {resp.coding_object.short_name}") else: - print(f" -> ECU response: unrecognized response of {len(payload)} bytes length: " + print(f" unrecognized response of {len(payload)} bytes length: " f"0x{payload.hex()}") return @@ -76,7 +85,7 @@ def handle_telegram(telegram_id: int, payload: bytes) -> None: last_request = None if decoded_message is not None: - print(f"tester request: \"{decoded_message.coding_object.short_name}\"") + print(f"request {decoded_message.coding_object.short_name}:") params = decoded_message.coding_object.parameters for param_name, param_val in decoded_message.param_dict.items(): param = [x for x in params if x.short_name == param_name][0] diff --git a/odxtools/leadinglengthinfotype.py b/odxtools/leadinglengthinfotype.py index 1105c348..9fb5cb8b 100644 --- a/odxtools/leadinglengthinfotype.py +++ b/odxtools/leadinglengthinfotype.py @@ -11,6 +11,11 @@ @dataclass class LeadingLengthInfoType(DiagCodedType): + #: bit length of the length specifier field + #: + #: this is then followed by the number of bytes specified by that + #: field, i.e., this is NOT the length of the LeadingLengthInfoType + #: object. bit_length: int def __post_init__(self) -> None: @@ -31,7 +36,11 @@ def dct_type(self) -> DctType: return "LEADING-LENGTH-INFO-TYPE" def get_static_bit_length(self) -> Optional[int]: - return self.bit_length + # note that self.bit_length is just the length of the length + # specifier field. This is then followed by the same number of + # bytes as the value of this field, i.e., the length of this + # DCT is dynamic! + return None def convert_internal_to_bytes(self, internal_value: Any, encode_state: EncodeState, bit_position: int) -> bytes: diff --git a/tests/test_singleecujob.py b/tests/test_singleecujob.py index 79d49ceb..9fb6ea97 100644 --- a/tests/test_singleecujob.py +++ b/tests/test_singleecujob.py @@ -424,8 +424,8 @@ def test_resolve_odxlinks(self) -> None: self.assertEqual(self.context.specialAudience, odxrequire(self.singleecujob_object.audience).enabled_audiences[0]) - self.assertEqual(self.context.inputDOP, self.singleecujob_object.input_params[0].dop) - self.assertEqual(self.context.outputDOP, self.singleecujob_object.output_params[0].dop) + self.assertEqual(self.context.inputDOP, self.singleecujob_object.input_params[0].dop_base) + self.assertEqual(self.context.outputDOP, self.singleecujob_object.output_params[0].dop_base) self.assertEqual(self.context.negOutputDOP, self.singleecujob_object.neg_output_params[0].dop) diff --git a/tests/test_somersault.py b/tests/test_somersault.py index 516d53cf..5397dfd7 100644 --- a/tests/test_somersault.py +++ b/tests/test_somersault.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: MIT import unittest -from odxtools.exceptions import odxrequire +from odxtools.exceptions import OdxError, odxrequire from odxtools.load_pdx_file import load_pdx_file from odxtools.parameters.nrcconstparameter import NrcConstParameter @@ -145,9 +145,11 @@ def test_somersault_lazy(self) -> None: self.assertEqual([x.short_name for x in service.negative_responses], ["flips_not_done"]) pr = service.positive_responses.grudging_forward - self.assertEqual([x.short_name for x in pr.parameters], ["sid", "num_flips_done"]) + self.assertEqual([x.short_name for x in pr.parameters], + ["sid", "num_flips_done", "sault_time"]) self.assertEqual([x.short_name for x in pr.required_parameters], []) - self.assertEqual(pr.get_static_bit_length(), 16) + self.assertEqual([x.short_name for x in pr.free_parameters], ["sault_time"]) + self.assertEqual(pr.get_static_bit_length(), 24) nr = service.negative_responses.flips_not_done self.assertEqual( @@ -162,7 +164,16 @@ def test_somersault_lazy(self) -> None: self.assertEqual(nrc_const.coded_values, [0, 1, 2]) -class TestDecode(unittest.TestCase): +class TestEnDecode(unittest.TestCase): + + def test_encode_specify_unknown_param(self) -> None: + ecu = odxdb.ecus.somersault_lazy + service = ecu.services.do_forward_flips + request = odxrequire(service.request) + with self.assertRaises(OdxError) as eo: + request.encode(forward_soberness_check=0x12, num_flips=5, grass_level="what grass?") + + self.assertEqual(str(eo.exception), "Value for unknown parameter 'grass_level' specified") def test_decode_request(self) -> None: messages = odxdb.ecus.somersault_assiduous.decode(bytes([0x03, 0x45])) @@ -226,9 +237,9 @@ def test_code_table_params(self) -> None: dizzyness_level=42, happiness_level=92, last_pos_response=("forward_grudging", { - "dizzyness_level": 42 + "sault_time": 249 })) - self.assertEqual(resp_data.hex(), "622a5c03fa7b") + self.assertEqual(resp_data.hex(), "622a5c03fa7bf9") decoded_resp_data = pr.decode(resp_data) assert isinstance(decoded_resp_data, dict) @@ -241,7 +252,7 @@ def test_code_table_params(self) -> None: self.assertEqual( set(decoded_resp_data["last_pos_response"] [1].keys()), # type: ignore[index, union-attr] - {"sid", "num_flips_done"}) + {"sid", "num_flips_done", "sault_time"}) # the num_flips_done parameter is a matching request parameter # for this response, so it produces a binary blob. possibly, # it should be changed to a ValueParameter... @@ -249,13 +260,16 @@ def test_code_table_params(self) -> None: decoded_resp_data["last_pos_response"][1] # type: ignore[index, call-overload] ["num_flips_done"], # type: ignore[index, call-overload] bytes([123])) + self.assertEqual( + decoded_resp_data["last_pos_response"][1] # type: ignore[index, call-overload] + ["sault_time"], # type: ignore[index, call-overload] + 249) # test the "backward flips grudgingly done" response resp_data = pr.encode( dizzyness_level=75, happiness_level=3, last_pos_response=("backward_grudging", { - 'dizzyness_level': 75, 'num_flips_done': 5, 'grumpiness_level': 150 })) @@ -310,14 +324,14 @@ def test_free_param_info(self) -> None: with patch("sys.stdout", stdout): pos_response.print_free_parameters_info() - expected_output = "forward_soberness_check: uint8\nnum_flips: uint8\n" + expected_output = "forward_soberness_check: uint8\nnum_flips: uint8\nsault_time: uint8\n" actual_output = stdout.getvalue() self.assertEqual(actual_output, expected_output) with patch("sys.stdout", stdout): neg_response.print_free_parameters_info() expected_output = ( - "forward_soberness_check: uint8\nnum_flips: uint8\nflips_successfully_done: uint8\n" + "forward_soberness_check: uint8\nnum_flips: uint8\nsault_time: uint8\nflips_successfully_done: uint8\n" ) actual_output = stdout.getvalue() self.assertEqual(actual_output, expected_output) @@ -336,9 +350,13 @@ def test_decode_response(self) -> None: f"There should be only one service for 0x0145 but there are: {messages}", ) m = messages[0] - self.assertEqual(m.coded_message, bytes([0xFA, 0x03])) + self.assertEqual(m.coded_message.hex(), "fa03ff") self.assertEqual(m.coding_object, pos_response) - self.assertEqual(m.param_dict, {"sid": 0xFA, "num_flips_done": bytearray([0x03])}) + self.assertEqual(m.param_dict, { + "sid": 0xFA, + "num_flips_done": bytearray([0x03]), + "sault_time": 255 + }) class TestNavigation(unittest.TestCase):