Skip to content

Commit

Permalink
diag_layer: allow to categorize services according to their UDS servi…
Browse files Browse the repository at this point in the history
…ce group (SID)

This is a convenience functionality to quickly find the diagnostic
services one is interested in for ECUs offering lots of services. The
simplest way to use it is using the `.print_info()` method:

```python
ecu.uds_service_groups.print_info()
```

That said, bins can also be programmatically processed:

```python
session_services = ecu.uds_service_groups[0x10]
print("The services concerning diagnostic sessions are:")
for session in session_services:
    print(f"  {service.short_name}")
```

Signed-off-by: Andreas Lauser <andreas.lauser@mbition.io>
Signed-off-by: Alexander Walz <alexander.walz@mbition.io>
  • Loading branch information
andlaus committed Oct 20, 2023
1 parent e453456 commit e3fa99c
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 1 deletion.
12 changes: 12 additions & 0 deletions odxtools/diaglayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .specialdatagroup import SpecialDataGroup
from .statechart import StateChart
from .table import Table
from .udsbinner import UdsBinner
from .unitgroup import UnitGroup
from .unitspec import UnitSpec

Expand Down Expand Up @@ -219,6 +220,17 @@ def _finalize_init(self, odxlinks: OdxLinkDatabase) -> None:
#####
self.diag_layer_raw._resolve_snrefs(self)

#####
# <convenience functionality>
#####
@cached_property
def uds_service_groups(self) -> UdsBinner:
return UdsBinner(self.services)

#####
# </convenience functionality>
#####

#####
# <properties forwarded to the "raw" diag layer>
#####
Expand Down
2 changes: 1 addition & 1 deletion odxtools/parameters/codedconstparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _resolve_snrefs(self, diag_layer: "DiagLayer") -> None:
super()._resolve_snrefs(diag_layer)

def get_static_bit_length(self) -> Optional[int]:
return getattr(self.diag_coded_type, "bit_length", None)
return self.diag_coded_type.get_static_bit_length()

@property
def internal_data_type(self) -> DataType:
Expand Down
123 changes: 123 additions & 0 deletions odxtools/udsbinner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# SPDX-License-Identifier: MIT
from typing import Dict, Iterable, Optional

from .diagservice import DiagService
from .nameditemlist import NamedItemList
from .parameters.codedconstparameter import CodedConstParameter


class UdsBinner:
"""Class to categorize a long list of services into the service
groups defined by the UDS standard.
This class is supposed to be used like this:
db = odxtools.load_file("my_cool_diagnostics_db.pdx")
...
uds_bins = UdsBinner(db.ecus.my_ecu)
uds_bins.print_info()
"""

def __init__(self, services: Iterable[DiagService]):
service_groups: Dict[Optional[int], NamedItemList[DiagService]] = {}
for service in services:
SID = self.__extract_sid(service)

if SID not in service_groups:
service_groups[SID] = NamedItemList()

service_groups[SID].append(service)

self.service_groups = service_groups

def __extract_sid(self, service: DiagService) -> Optional[int]:
# diagnostic services without requests are possible; just like
# aircraft without wings...
if service.request is None:
return None

prefix = 0 # prefix of constant parameters
cursor = 0 # bit position of the next parameter
for param in service.request.parameters:
if not isinstance(param, CodedConstParameter):
# we *need* at least the first byte of a request to be statically defined!
return None

param_len = param.get_static_bit_length()
if param_len is None:
return None

if not isinstance(param.coded_value, int):
return None

prefix <<= param_len
prefix |= param.coded_value & ((1 << param_len) - 1)
cursor += param_len

if cursor >= 8:
# we have a prefix that is at least 8 bits
# long. return its most significant byte.
prefix >>= cursor - 8
return prefix & 0xff

return None

def print_info(self) -> None:
"""Convenience method to print which of the diagnostic
services are part of which UDS service group.
"""
for sid, service_list in self.service_groups.items():
sid_name = self.sid_to_name.get(sid)
if isinstance(sid, int):
if sid_name is not None:
print(f"UDS service group '{sid_name}' (0x{sid:x}):")
else:
print(f"Non-standard UDS service group 0x{sid:x}:")
else:
print(f"Non standard services:")
for service in service_list:
print(f" {service.short_name}")

#: map from the ID of a service group to its human-readable name
#: as defined by the UDS standard
sid_to_name = {
# diagnostic services that cannot be categorized in terms of
# the UDS standard get `None` as their SID...
None: "Not Applicable",
0x10: "Session",
0x11: "ECU Reset",
0x27: "Security Access",
0x28: "Communication Control",
0x29: "Authentication",
0x3e: "Tester Present",
0x83: "Access Timing Parameter",
0x84: "Secured Data Transmission",
0x85: "Control DTC Settings",
0x86: "Response On Event",
0x87: "Link Control",
0x22: "Read Data By Identifier ",
0x23: "Read Memory By Address",
0x24: "Read Scaling Data By Identifier",
0x2a: "Read Data By Periodic Identifier",
0x2c: "Dynamically Define Data Identifier",
0x2e: "Write Data By Identifier",
0x3d: "Write Memory By Address ",
0x14: "Clear Diagnostic Information",
0x19: "Read DTC Information",
0x2f: "Input Output Control By Identifier",
0x31: "Routine Control",
0x34: "Request Download",
0x35: "Request Upload",
0x36: "Transfer Data",
0x37: "Request Transfer Exit",
0x38: "Request File Transfer",
}

def __getitem__(self, sid: Optional[int]) -> NamedItemList[DiagService]:
if sid is None:
return self.service_groups.get(sid, NamedItemList())

if sid < 0 or sid > 255:
raise AttributeError(f"SIDs must be in range 0x00 to 0xff")

return self.service_groups.get(sid, NamedItemList())
14 changes: 14 additions & 0 deletions tests/test_odxtools.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,20 @@ def test_find_service_by_name(self) -> None:
self.assertEqual(service.odx_id.local_id, "somersault.service.session_start")
self.assertEqual(service.semantic, "SESSION")

def test_udsbinner(self) -> None:
ecu = odxdb.ecus.somersault_lazy

service_groups = ecu.uds_service_groups

self.assertEqual(len(service_groups.service_groups), 4)
self.assertEqual([s.short_name for s in service_groups[0x10]],
["session_start", "session_stop"])
self.assertEqual(service_groups[0x10].session_start.short_name, "session_start")
self.assertEqual([s.short_name for s in service_groups[0xba]], ["do_forward_flips"])
self.assertTrue(0x42 not in service_groups.service_groups)
self.assertEqual(service_groups[0x42], NamedItemList())
self.assertTrue(isinstance(service_groups.sid_to_name[0x10], str))


if __name__ == "__main__":
unittest.main()

0 comments on commit e3fa99c

Please sign in to comment.