Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement de- and encoding of environment data descriptions #321

Merged
merged 4 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions odxtools/basicstructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from .dataobjectproperty import DataObjectProperty
from .decodestate import DecodeState
from .encodestate import EncodeState
from .exceptions import EncodeError, OdxWarning, odxassert, odxraise, strict_mode
from .exceptions import EncodeError, OdxWarning, odxassert, odxraise
from .nameditemlist import NamedItemList
from .odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId
from .odxtypes import ParameterDict, ParameterValue, ParameterValueDict
Expand Down Expand Up @@ -78,6 +78,7 @@ def coded_const_prefix(self, request_prefix: bytes = b'') -> bytes:
param.encode_into_pdu(physical_value=None, encode_state=encode_state)
else:
break

return encode_state.coded_message

@property
Expand Down Expand Up @@ -159,8 +160,8 @@ def encode_into_pdu(self, physical_value: Optional[ParameterValue],
orig_is_end_of_pdu = encode_state.is_end_of_pdu
encode_state.is_end_of_pdu = False

# in strict mode, ensure that no values for unknown parameters are specified.
if strict_mode:
# ensure that no values for unknown parameters are specified.
if not encode_state.allow_unknown_parameters:
param_names = {param.short_name for param in self.parameters}
for param_value_name in physical_value:
if param_value_name not in param_names:
Expand Down Expand Up @@ -192,8 +193,10 @@ def encode_into_pdu(self, physical_value: Optional[ParameterValue],
odxraise(f"No value for required parameter {param.short_name} specified",
EncodeError)

param.encode_into_pdu(
physical_value=physical_value.get(param.short_name), encode_state=encode_state)
param_phys_value = physical_value.get(param.short_name)
param.encode_into_pdu(physical_value=param_phys_value, encode_state=encode_state)

encode_state.journal.append((param, param_phys_value))

encode_state.is_end_of_pdu = False
if self.byte_size is not None:
Expand Down Expand Up @@ -235,6 +238,7 @@ def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
for param in self.parameters:
value = param.decode_from_pdu(decode_state)

decode_state.journal.append((param, value))
result[param.short_name] = value

# decoding of the structure finished. go back the original origin.
Expand Down
10 changes: 8 additions & 2 deletions odxtools/decodestate.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Dict, cast
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, cast

import odxtools.exceptions as exceptions

from .exceptions import DecodeError
from .odxtypes import AtomicOdxType, DataType
from .odxtypes import AtomicOdxType, DataType, ParameterValue

try:
import bitstruct.c as bitstruct
except ImportError:
import bitstruct

if TYPE_CHECKING:
from .parameters.parameter import Parameter
from .tablerow import TableRow


Expand Down Expand Up @@ -46,6 +47,11 @@ class DecodeState:
#: values of the table key parameters decoded so far
table_keys: Dict[str, "TableRow"] = field(default_factory=dict)

#: List of parameters that have been decoded so far. The journal
#: is used by some types of parameters which depend on the values of
#: other parameters; i.e., environment data description parameters
journal: List[Tuple["Parameter", Optional[ParameterValue]]] = field(default_factory=list)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a dict would be faster for lookup

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need the sequence in which the parameters were processed, i.e. that would require at least OrderedDict. That said, I'm not sure how such a lookup would be realized and also parameters frequently appear more than once, e.g., for fields (which are repeated structure objects)...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are using the journal with reversed, meaning only the last value is actually needed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but then a different name than .journal is also required IMO. (I think that conceptually, the concept of a "change log" for en-/decoding is quite nice.) Performance-wise I don't think that it will matter much because responses containing environment data descriptions are probable quite rare (e.g., I don't have any files which use them and so far we have not gotten any bug reports about missing en-/decoding support.)


def extract_atomic_value(
self,
bit_length: int,
Expand Down
4 changes: 2 additions & 2 deletions odxtools/diagnostictroublecode.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional
from xml.etree import ElementTree

from .element import IdentifiableElement
Expand All @@ -17,7 +17,7 @@ class DiagnosticTroubleCode(IdentifiableElement):
trouble_code: int
text: Optional[str]
display_trouble_code: Optional[str]
level: Union[int, None]
level: Optional[int]
is_temporary_raw: Optional[bool]
sdgs: List[SpecialDataGroup]

Expand Down
39 changes: 26 additions & 13 deletions odxtools/dtcdop.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,35 @@ def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
sdgs=[],
)

@override
def encode_into_pdu(self, physical_value: Optional[ParameterValue],
encode_state: EncodeState) -> None:
if isinstance(physical_value, DiagnosticTroubleCode):
trouble_code = physical_value.trouble_code
elif isinstance(physical_value, int):
def numerical_trouble_code(self, dtc_value: ParameterValue) -> int:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function names usually start with a verb

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you prefer get_numerical_trouble_code(), convert_to_numerical_trouble_code() or something else?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either would be fine

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I renamed it to convert_to_numerical_trouble_code()

if isinstance(dtc_value, DiagnosticTroubleCode):
return dtc_value.trouble_code
elif isinstance(dtc_value, int):
# assume that physical value is the trouble_code
trouble_code = physical_value
elif isinstance(physical_value, str):
return dtc_value
elif isinstance(dtc_value, str):
# assume that physical value is the short_name
dtcs = [dtc for dtc in self.dtcs if dtc.short_name == physical_value]
odxassert(len(dtcs) == 1)
trouble_code = dtcs[0].trouble_code
dtcs = [dtc for dtc in self.dtcs if dtc.short_name == dtc_value]
if len(dtcs) != 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it happen that length > 1 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so: in this case, it would be impossible to decide which of the environment datas that apply for the DTC ought to be used. (I suppose that this constitutes an incorrect PDX file?)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the specs say anything about that?

Copy link
Member Author

@andlaus andlaus Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not aware that it explicitly mentions multiple "normal" environment datas matching the same DTC, but it says that there can only be a single ALL-DATA environment data. Since multiple ALL-DATA environment datas is exactly the same problem as multiple "normal" ones for a given DTC, I infer that this is forbidden. (in which order shall these environment datas be processed anyway?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, sorry: I was in the wrong mental context for my previous two comments (interestingly they are still applicable to some extend): more than one DTC with the same short name cannot happen in valid ODX datasets because short names are required to be unique in their respective context. (if they were not, SNREFs would not work...)

odxraise(f"No DTC named {dtc_value} found for DTC-DOP "
f"{self.short_name}.", EncodeError)
return cast(int, None)

return dtcs[0].trouble_code
else:
raise EncodeError(f"The DTC-DOP {self.short_name} expected a"
f" DiagnosticTroubleCode but got {physical_value!r}.")
odxraise(
f"The DTC-DOP {self.short_name} expected a"
f" diagnostic trouble code but got {type(dtc_value).__name__}", EncodeError)
return cast(int, None)

@override
def encode_into_pdu(self, physical_value: Optional[ParameterValue],
encode_state: EncodeState) -> None:
if physical_value is None:
odxraise(f"No DTC specified", EncodeError)
return

trouble_code = self.numerical_trouble_code(physical_value)

internal_trouble_code = int(self.compu_method.convert_physical_to_internal(trouble_code))

Expand Down
16 changes: 14 additions & 2 deletions odxtools/encodestate.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
# SPDX-License-Identifier: MIT
import warnings
from dataclasses import dataclass, field
from typing import Dict, Optional, SupportsBytes
from typing import TYPE_CHECKING, Dict, List, Optional, SupportsBytes, Tuple

from .exceptions import EncodeError, OdxWarning, odxassert, odxraise
from .odxtypes import AtomicOdxType, DataType
from .odxtypes import AtomicOdxType, DataType, ParameterValue

try:
import bitstruct.c as bitstruct
except ImportError:
import bitstruct

if TYPE_CHECKING:
from .parameters.parameter import Parameter


@dataclass
class EncodeState:
Expand Down Expand Up @@ -56,6 +59,15 @@ class EncodeState:
#: (needed for MinMaxLengthType, EndOfPduField, etc.)
is_end_of_pdu: bool = True

#: list of parameters that have been encoded so far. The journal
#: is used by some types of parameters which depend on the values of
#: other parameters; e.g., environment data description parameters
journal: List[Tuple["Parameter", Optional[ParameterValue]]] = field(default_factory=list)

#: If this is True, specifying unknown parameters for encoding
#: will raise an OdxError exception in strict mode.
allow_unknown_parameters = False

def __post_init__(self) -> None:
# if a coded message has been specified, but no used_mask, we
# assume that all of the bits of the coded message are
Expand Down
148 changes: 134 additions & 14 deletions odxtools/environmentdatadescription.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, cast
from xml.etree import ElementTree

from typing_extensions import override

from .complexdop import ComplexDop
from .decodestate import DecodeState
from .dtcdop import DtcDop
from .encodestate import EncodeState
from .environmentdata import EnvironmentData
from .exceptions import odxraise, odxrequire
from .odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId, OdxLinkRef
from .odxtypes import ParameterValue
from .odxtypes import ParameterValue, ParameterValueDict
from .parameters.parameter import Parameter
from .snrefcontext import SnRefContext
from .utils import dataclass_fields_asdict

Expand All @@ -27,16 +29,26 @@ class EnvironmentDataDescription(ComplexDop):

"""

param_snref: Optional[str]
param_snpathref: Optional[str]

# in ODX 2.0.0, ENV-DATAS seems to be a mandatory
# sub-element of ENV-DATA-DESC, in ODX 2.2 it is not
# present
env_datas: List[EnvironmentData]
env_data_refs: List[OdxLinkRef]
param_snref: Optional[str]
param_snpathref: Optional[str]

def __post_init__(self) -> None:
self.bit_length = None
@property
def param(self) -> Parameter:
# the parameter referenced via SNREF cannot be resolved here
# because the relevant list of parameters depends on the
# concrete codec object processed, whilst an environment data
# description object can be featured in an arbitrary number of
# responses. TODO: resolve the parameter reference in the
# encode() and decode() methods.
odxraise("The parameter of ENV-DATA-DESC objects cannot be resolved "
"because it depends on the context")
return cast(None, Parameter)

@staticmethod
def from_et(et_element: ElementTree.Element,
Expand Down Expand Up @@ -96,17 +108,125 @@ def _resolve_snrefs(self, context: SnRefContext) -> None:
def encode_into_pdu(self, physical_value: Optional[ParameterValue],
encode_state: EncodeState) -> None:
"""Convert a physical value into bytes and emplace them into a PDU.

Since environmental data is supposed to never appear on the
wire, this method just raises an EncodeError exception.
"""
odxraise("EnvironmentDataDescription DOPs cannot be encoded or decoded")

# retrieve the relevant DTC parameter which must be located in
# front of the environment data description.
if self.param_snref is None:
odxraise("Specifying the DTC parameter for environment data "
"descriptions via SNPATHREF is not supported yet")
return None

dtc_param: Optional[Parameter] = None
dtc_dop: Optional[DtcDop] = None
dtc_param_value: Optional[ParameterValue] = None
for prev_param, prev_param_value in reversed(encode_state.journal):
if prev_param.short_name == self.param_snref:
dtc_param = prev_param
prev_dop = getattr(prev_param, "dop", None)
if not isinstance(prev_dop, DtcDop):
odxraise(f"The DOP of the parameter referenced by environment data "
f"descriptions must be a DTC-DOP (is '{type(prev_dop).__name__}')")
return
dtc_dop = prev_dop
dtc_param_value = prev_param_value
break

if dtc_param is None:
odxraise("Environment data description parameters are only allowed following "
"the referenced value parameter.")
return

if dtc_param_value is None or dtc_dop is None:
# this should never happen
odxraise()
return

numerical_dtc = dtc_dop.numerical_trouble_code(dtc_param_value)

# deal with the "all value" environment data. This holds
# parameters that are common to all DTCs. Be aware that the
# specification mandates that there is at most one such
# environment data object
for env_data in self.env_datas:
if env_data.all_value:
tmp = encode_state.allow_unknown_parameters
encode_state.allow_unknown_parameters = True
env_data.encode_into_pdu(physical_value, encode_state)
encode_state.allow_unknown_parameters = tmp
break

# find the environment data corresponding to the given trouble
# code
for env_data in self.env_datas:
if numerical_dtc in env_data.dtc_values:
tmp = encode_state.allow_unknown_parameters
encode_state.allow_unknown_parameters = True
env_data.encode_into_pdu(physical_value, encode_state)
encode_state.allow_unknown_parameters = tmp
break

@override
def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
"""Extract the bytes from a PDU and convert them to a physical value.

Since environmental data is supposed to never appear on the
wire, this method just raises an DecodeError exception.
"""
odxraise("EnvironmentDataDescription DOPs cannot be encoded or decoded")

# retrieve the relevant DTC parameter which must be located in
# front of the environment data description.
if self.param_snref is None:
odxraise("Specifying the DTC parameter for environment data "
"descriptions via SNPATHREF is not supported yet")
return None

dtc_param: Optional[Parameter] = None
dtc_dop: Optional[DtcDop] = None
dtc_param_value: Optional[ParameterValue] = None
for prev_param, prev_param_value in reversed(decode_state.journal):
if prev_param.short_name == self.param_snref:
dtc_param = prev_param
prev_dop = getattr(prev_param, "dop", None)
if not isinstance(prev_dop, DtcDop):
odxraise(f"The DOP of the parameter referenced by environment data "
f"descriptions must be a DTC-DOP (is '{type(prev_dop).__name__}')")
return
dtc_dop = prev_dop
dtc_param_value = prev_param_value
break

if dtc_param is None:
odxraise("Environment data description parameters are only allowed following "
"the referenced value parameter.")
return

if dtc_param_value is None or dtc_dop is None:
# this should never happen
odxraise()
return

numerical_dtc = dtc_dop.numerical_trouble_code(dtc_param_value)

result: ParameterValueDict = {}

# deal with the "all value" environment data. This holds
# parameters that are common to all DTCs. Be aware that the
# specification mandates that there is at most one such
# environment data object
for env_data in self.env_datas:
if env_data.all_value:
tmp = env_data.decode_from_pdu(decode_state)
if not isinstance(tmp, dict):
odxraise()
result.update(tmp)
break

# find the environment data corresponding to the given trouble
# code
for env_data in self.env_datas:
if numerical_dtc in env_data.dtc_values:
tmp = env_data.decode_from_pdu(decode_state)
if not isinstance(tmp, dict):
odxraise()
result.update(tmp)
break

return result
Loading
Loading