Skip to content

Commit

Permalink
Merge pull request #263 from andlaus/cleanups
Browse files Browse the repository at this point in the history
Cleanups
  • Loading branch information
andlaus authored Feb 6, 2024
2 parents f34c3fc + a340041 commit 76a4c44
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 40 deletions.
Binary file modified examples/somersault.pdx
Binary file not shown.
12 changes: 12 additions & 0 deletions examples/somersaultecu.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,18 @@ class SomersaultSID(IntEnum):
bit_position=None,
sdgs=[],
),
ValueParameter(
short_name="sault_time",
long_name=None,
semantic=None,
description=None,
physical_default_value_raw="255",
byte_position=2,
dop_ref=OdxLinkRef("somersault.DOP.duration", doc_frags),
dop_snref=None,
bit_position=None,
sdgs=[],
),
]),
byte_size=None,
),
Expand Down
50 changes: 36 additions & 14 deletions odxtools/basicstructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .dataobjectproperty import DataObjectProperty
from .decodestate import DecodeState
from .encodestate import EncodeState
from .exceptions import DecodeError, EncodeError, OdxWarning, odxassert, odxraise
from .exceptions import DecodeError, EncodeError, OdxWarning, odxassert, odxraise, strict_mode
from .nameditemlist import NamedItemList
from .odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId
from .odxtypes import ParameterDict, ParameterValue, ParameterValueDict
Expand Down Expand Up @@ -74,10 +74,10 @@ def get_static_bit_length(self) -> Optional[int]:
def coded_const_prefix(self, request_prefix: bytes = b'') -> bytes:
prefix = b''
encode_state = EncodeState(prefix, parameter_values={}, triggering_request=request_prefix)
for p in self.parameters:
if isinstance(p, (CodedConstParameter, NrcConstParameter, MatchingRequestParameter,
PhysicalConstantParameter)):
encode_state.coded_message = p.encode_into_pdu(encode_state)
for param in self.parameters:
if isinstance(param, (CodedConstParameter, NrcConstParameter, MatchingRequestParameter,
PhysicalConstantParameter)):
encode_state.coded_message = param.encode_into_pdu(encode_state)
else:
break
return encode_state.coded_message
Expand Down Expand Up @@ -124,6 +124,13 @@ def convert_physical_to_internal(self,
f"Expected a dictionary for the values of structure {self.short_name}, "
f"got {type(param_value)}")

# in strict mode, ensure that no values for unknown parameters are specified.
if strict_mode:
param_names = [param.short_name for param in self.parameters]
for param_key in param_value:
if param_key not in param_names:
odxraise(f"Value for unknown parameter '{param_key}' specified")

encode_state = EncodeState(
b'',
dict(param_value),
Expand All @@ -139,6 +146,21 @@ def convert_physical_to_internal(self,
# the ODX is located last in the PDU...
encode_state.is_end_of_pdu = is_end_of_pdu

if isinstance(
param,
(LengthKeyParameter, TableKeyParameter)) and param.short_name in param_value:
# This is a hack to always encode a dummy value for
# length- and table keys. since these can be specified
# implicitly (i.e., by means of parameters that use
# these keys), to avoid getting an "overlapping
# parameter" warning, we must encode a value of zero
# into the PDU here and add the real value of the
# parameter in a post processing step.
tmp = encode_state.parameter_values.pop(param.short_name)
encode_state.coded_message = param.encode_into_pdu(encode_state)
encode_state.parameter_values[param.short_name] = tmp
continue

encode_state.coded_message = param.encode_into_pdu(encode_state)

if self.byte_size is not None and len(encode_state.coded_message) < self.byte_size:
Expand Down Expand Up @@ -213,10 +235,10 @@ def convert_bytes_to_physical(self,
inner_decode_state = DecodeState(
coded_message=byte_code, parameter_values={}, cursor_position=0)

for parameter in self.parameters:
value, cursor_position = parameter.decode_from_pdu(inner_decode_state)
for param in self.parameters:
value, cursor_position = param.decode_from_pdu(inner_decode_state)

inner_decode_state.parameter_values[parameter.short_name] = value
inner_decode_state.parameter_values[param.short_name] = value
inner_decode_state = DecodeState(
coded_message=byte_code,
parameter_values=inner_decode_state.parameter_values,
Expand Down Expand Up @@ -282,21 +304,21 @@ def parameter_dict(self) -> ParameterDict:
def _build_odxlinks(self) -> Dict[OdxLinkId, Any]:
result = super()._build_odxlinks()

for p in self.parameters:
result.update(p._build_odxlinks())
for param in self.parameters:
result.update(param._build_odxlinks())

return result

def _resolve_odxlinks(self, odxlinks: OdxLinkDatabase) -> None:
"""Recursively resolve any references (odxlinks or sn-refs)"""
super()._resolve_odxlinks(odxlinks)

for p in self.parameters:
p._resolve_odxlinks(odxlinks)
for param in self.parameters:
param._resolve_odxlinks(odxlinks)

def _resolve_snrefs(self, diag_layer: "DiagLayer") -> None:
"""Recursively resolve any references (odxlinks or sn-refs)"""
super()._resolve_snrefs(diag_layer)

for p in self.parameters:
p._resolve_snrefs(diag_layer)
for param in self.parameters:
param._resolve_snrefs(diag_layer)
31 changes: 20 additions & 11 deletions odxtools/cli/snoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import odxtools.uds as uds
from odxtools.exceptions import DecodeError
from odxtools.isotp_state_machine import IsoTpStateMachine
from odxtools.response import Response, ResponseType

from . import _parser_utils

Expand All @@ -31,7 +32,7 @@ def handle_telegram(telegram_id: int, payload: bytes) -> None:

if telegram_id == ecu_tx_id:
if uds.is_response_pending(payload):
print(f" -> ECU: ... (response pending)")
print(f" ... (response pending)")
return

decoded_message = None
Expand All @@ -48,21 +49,29 @@ def handle_telegram(telegram_id: int, payload: bytes) -> None:
if len(decoded_message) > 1:
dec_str = f" (decoding {i+1})"

response_type = getattr(resp.coding_object, "response_type", None)
rt_str = "unknown"
if response_type == "POS-RESPONSE":
rt_str = "positive"
elif response_type == "NEG-RESPONSE":
rt_str = "negative"

print(f" -> {rt_str} ECU response{dec_str}: \"{resp.coding_object.short_name}\"")
if isinstance(resp.coding_object, Response):
if resp.coding_object.response_type == ResponseType.POSITIVE:
rt_str = "positive"
elif resp.coding_object.response_type in (ResponseType.NEGATIVE,
ResponseType.GLOBAL_NEGATIVE):
rt_str = "negative"

settable_params = []
for param_name, param_val in resp.param_dict.items():
param = [x for x in params if x.short_name == param_name][0]
if not param.is_settable:
continue
print(f" {param_name} = {repr(param_val)}")
settable_params.append((param_name, param_val))

if settable_params:
print(f" {rt_str} response{dec_str} {resp.coding_object.short_name}:")
for param_name, param_val in settable_params:
print(f" {param_name} = {repr(param_val)}")
else:
print(f" {rt_str} response{dec_str} {resp.coding_object.short_name}")
else:
print(f" -> ECU response: unrecognized response of {len(payload)} bytes length: "
print(f" unrecognized response of {len(payload)} bytes length: "
f"0x{payload.hex()}")

return
Expand All @@ -76,7 +85,7 @@ def handle_telegram(telegram_id: int, payload: bytes) -> None:
last_request = None

if decoded_message is not None:
print(f"tester request: \"{decoded_message.coding_object.short_name}\"")
print(f"request {decoded_message.coding_object.short_name}:")
params = decoded_message.coding_object.parameters
for param_name, param_val in decoded_message.param_dict.items():
param = [x for x in params if x.short_name == param_name][0]
Expand Down
11 changes: 10 additions & 1 deletion odxtools/leadinglengthinfotype.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@

@dataclass
class LeadingLengthInfoType(DiagCodedType):
#: bit length of the length specifier field
#:
#: this is then followed by the number of bytes specified by that
#: field, i.e., this is NOT the length of the LeadingLengthInfoType
#: object.
bit_length: int

def __post_init__(self) -> None:
Expand All @@ -31,7 +36,11 @@ def dct_type(self) -> DctType:
return "LEADING-LENGTH-INFO-TYPE"

def get_static_bit_length(self) -> Optional[int]:
return self.bit_length
# note that self.bit_length is just the length of the length
# specifier field. This is then followed by the same number of
# bytes as the value of this field, i.e., the length of this
# DCT is dynamic!
return None

def convert_internal_to_bytes(self, internal_value: Any, encode_state: EncodeState,
bit_position: int) -> bytes:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_singleecujob.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ def test_resolve_odxlinks(self) -> None:
self.assertEqual(self.context.specialAudience,
odxrequire(self.singleecujob_object.audience).enabled_audiences[0])

self.assertEqual(self.context.inputDOP, self.singleecujob_object.input_params[0].dop)
self.assertEqual(self.context.outputDOP, self.singleecujob_object.output_params[0].dop)
self.assertEqual(self.context.inputDOP, self.singleecujob_object.input_params[0].dop_base)
self.assertEqual(self.context.outputDOP, self.singleecujob_object.output_params[0].dop_base)
self.assertEqual(self.context.negOutputDOP,
self.singleecujob_object.neg_output_params[0].dop)

Expand Down
42 changes: 30 additions & 12 deletions tests/test_somersault.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: MIT
import unittest

from odxtools.exceptions import odxrequire
from odxtools.exceptions import OdxError, odxrequire
from odxtools.load_pdx_file import load_pdx_file
from odxtools.parameters.nrcconstparameter import NrcConstParameter

Expand Down Expand Up @@ -145,9 +145,11 @@ def test_somersault_lazy(self) -> None:
self.assertEqual([x.short_name for x in service.negative_responses], ["flips_not_done"])

pr = service.positive_responses.grudging_forward
self.assertEqual([x.short_name for x in pr.parameters], ["sid", "num_flips_done"])
self.assertEqual([x.short_name for x in pr.parameters],
["sid", "num_flips_done", "sault_time"])
self.assertEqual([x.short_name for x in pr.required_parameters], [])
self.assertEqual(pr.get_static_bit_length(), 16)
self.assertEqual([x.short_name for x in pr.free_parameters], ["sault_time"])
self.assertEqual(pr.get_static_bit_length(), 24)

nr = service.negative_responses.flips_not_done
self.assertEqual(
Expand All @@ -162,7 +164,16 @@ def test_somersault_lazy(self) -> None:
self.assertEqual(nrc_const.coded_values, [0, 1, 2])


class TestDecode(unittest.TestCase):
class TestEnDecode(unittest.TestCase):

def test_encode_specify_unknown_param(self) -> None:
ecu = odxdb.ecus.somersault_lazy
service = ecu.services.do_forward_flips
request = odxrequire(service.request)
with self.assertRaises(OdxError) as eo:
request.encode(forward_soberness_check=0x12, num_flips=5, grass_level="what grass?")

self.assertEqual(str(eo.exception), "Value for unknown parameter 'grass_level' specified")

def test_decode_request(self) -> None:
messages = odxdb.ecus.somersault_assiduous.decode(bytes([0x03, 0x45]))
Expand Down Expand Up @@ -226,9 +237,9 @@ def test_code_table_params(self) -> None:
dizzyness_level=42,
happiness_level=92,
last_pos_response=("forward_grudging", {
"dizzyness_level": 42
"sault_time": 249
}))
self.assertEqual(resp_data.hex(), "622a5c03fa7b")
self.assertEqual(resp_data.hex(), "622a5c03fa7bf9")

decoded_resp_data = pr.decode(resp_data)
assert isinstance(decoded_resp_data, dict)
Expand All @@ -241,21 +252,24 @@ def test_code_table_params(self) -> None:
self.assertEqual(
set(decoded_resp_data["last_pos_response"]
[1].keys()), # type: ignore[index, union-attr]
{"sid", "num_flips_done"})
{"sid", "num_flips_done", "sault_time"})
# the num_flips_done parameter is a matching request parameter
# for this response, so it produces a binary blob. possibly,
# it should be changed to a ValueParameter...
self.assertEqual(
decoded_resp_data["last_pos_response"][1] # type: ignore[index, call-overload]
["num_flips_done"], # type: ignore[index, call-overload]
bytes([123]))
self.assertEqual(
decoded_resp_data["last_pos_response"][1] # type: ignore[index, call-overload]
["sault_time"], # type: ignore[index, call-overload]
249)

# test the "backward flips grudgingly done" response
resp_data = pr.encode(
dizzyness_level=75,
happiness_level=3,
last_pos_response=("backward_grudging", {
'dizzyness_level': 75,
'num_flips_done': 5,
'grumpiness_level': 150
}))
Expand Down Expand Up @@ -310,14 +324,14 @@ def test_free_param_info(self) -> None:

with patch("sys.stdout", stdout):
pos_response.print_free_parameters_info()
expected_output = "forward_soberness_check: uint8\nnum_flips: uint8\n"
expected_output = "forward_soberness_check: uint8\nnum_flips: uint8\nsault_time: uint8\n"
actual_output = stdout.getvalue()
self.assertEqual(actual_output, expected_output)

with patch("sys.stdout", stdout):
neg_response.print_free_parameters_info()
expected_output = (
"forward_soberness_check: uint8\nnum_flips: uint8\nflips_successfully_done: uint8\n"
"forward_soberness_check: uint8\nnum_flips: uint8\nsault_time: uint8\nflips_successfully_done: uint8\n"
)
actual_output = stdout.getvalue()
self.assertEqual(actual_output, expected_output)
Expand All @@ -336,9 +350,13 @@ def test_decode_response(self) -> None:
f"There should be only one service for 0x0145 but there are: {messages}",
)
m = messages[0]
self.assertEqual(m.coded_message, bytes([0xFA, 0x03]))
self.assertEqual(m.coded_message.hex(), "fa03ff")
self.assertEqual(m.coding_object, pos_response)
self.assertEqual(m.param_dict, {"sid": 0xFA, "num_flips_done": bytearray([0x03])})
self.assertEqual(m.param_dict, {
"sid": 0xFA,
"num_flips_done": bytearray([0x03]),
"sault_time": 255
})


class TestNavigation(unittest.TestCase):
Expand Down

0 comments on commit 76a4c44

Please sign in to comment.