Skip to content

Commit

Permalink
Merge pull request #226 from andlaus/uds_binner
Browse files Browse the repository at this point in the history
diag_layer: allow to access services according to their UDS service group (SID)
  • Loading branch information
andlaus authored Oct 23, 2023
2 parents 41605c8 + 6bebe4f commit 128c323
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 2 deletions.
12 changes: 12 additions & 0 deletions odxtools/diaglayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .parentref import ParentRef
from .request import Request
from .response import Response
from .servicebinner import ServiceBinner
from .singleecujob import SingleEcuJob
from .specialdatagroup import SpecialDataGroup
from .statechart import StateChart
Expand Down Expand Up @@ -219,6 +220,17 @@ def _finalize_init(self, odxlinks: OdxLinkDatabase) -> None:
#####
self.diag_layer_raw._resolve_snrefs(self)

#####
# <convenience functionality>
#####
@cached_property
def service_groups(self) -> ServiceBinner:
return ServiceBinner(self.services)

#####
# </convenience functionality>
#####

#####
# <properties forwarded to the "raw" diag layer>
#####
Expand Down
5 changes: 4 additions & 1 deletion odxtools/leadinglengthinfotype.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import Any, Tuple
from typing import Any, Optional, Tuple

from .decodestate import DecodeState
from .diagcodedtype import DctType, DiagCodedType
Expand Down Expand Up @@ -30,6 +30,9 @@ def __post_init__(self) -> None:
def dct_type(self) -> DctType:
return "LEADING-LENGTH-INFO-TYPE"

def get_static_bit_length(self) -> Optional[int]:
return self.bit_length

def convert_internal_to_bytes(self, internal_value: Any, encode_state: EncodeState,
bit_position: int) -> bytes:

Expand Down
25 changes: 25 additions & 0 deletions odxtools/obd.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# SPDX-License-Identifier: MIT
from enum import IntEnum
from typing import Optional


class SID(IntEnum):
Expand Down Expand Up @@ -30,3 +31,27 @@ class SID(IntEnum):
ControlOperationOfOnboardComponentOrSystem = 0x08
RequestVehicleInformation = 0x09
PermanentDiagnosticTroubleCodes = 0x0A


_sid_to_name = {
0x01: "Show Current Data",
0x02: "Show Freeze Frame Data",
0x03: "Show Stored Diagnostic Trouble Codes",
0x04: "Clear Diagnostic Trouble Codes and Stored Values",
0x05: "Test Results, non-CAN Oxygen Sensor Monitoring",
0x06: "Test Results, CAN Oxygen Sensor Monitoring",
0x07: "Show Pending Diagnostic Trouble Codes",
0x08: "Control Operation of Onboard Component or System",
0x09: "Request Vehicle Information",
0x0A: "Permanent Diagnostic Trouble Codes",
}


def sid_to_name(sid: int) -> Optional[str]:
if sid in _sid_to_name:
return _sid_to_name[sid]

return None


# TODO: PIDs
2 changes: 1 addition & 1 deletion odxtools/parameters/codedconstparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _resolve_snrefs(self, diag_layer: "DiagLayer") -> None:
super()._resolve_snrefs(diag_layer)

def get_static_bit_length(self) -> Optional[int]:
return getattr(self.diag_coded_type, "bit_length", None)
return self.diag_coded_type.get_static_bit_length()

@property
def internal_data_type(self) -> DataType:
Expand Down
109 changes: 109 additions & 0 deletions odxtools/servicebinner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# SPDX-License-Identifier: MIT
from io import StringIO
from typing import Dict, Iterable, Iterator, Optional

from . import obd, uds
from .diagservice import DiagService
from .nameditemlist import NamedItemList
from .parameters.codedconstparameter import CodedConstParameter


class ServiceBinner:
"""Class to categorize a long list of services into the service
groups defined by the UDS or OBD standards.
This class is supposed to be used like this:
db = odxtools.load_file("my_cool_diagnostics_db.pdx")
...
service_groups = ServiceBinner(db.ecus.my_ecu.services)
print(service_groups)
"""

def __init__(self, services: Iterable[DiagService]):
service_groups: Dict[Optional[int], NamedItemList[DiagService]] = {}
for service in services:
SID = self.__extract_sid(service)

if SID not in service_groups:
service_groups[SID] = NamedItemList()

service_groups[SID].append(service)

self._service_groups = service_groups

def __extract_sid(self, service: DiagService) -> Optional[int]:
# diagnostic services without requests are possible; just like
# aircraft without wings...
if service.request is None:
return None

prefix = 0 # prefix of constant parameters
cursor = 0 # bit position of the next parameter
for param in service.request.parameters:
if not isinstance(param, CodedConstParameter):
# we *need* at least the first byte of a request to be statically defined!
return None

param_len = param.get_static_bit_length()
if param_len is None:
return None

if not isinstance(param.coded_value, int):
return None

prefix <<= param_len
prefix |= param.coded_value & ((1 << param_len) - 1)
cursor += param_len

if cursor >= 8:
# we have a prefix that is at least 8 bits
# long. return its most significant byte.
prefix >>= cursor - 8
return prefix & 0xff

return None

def __str__(self) -> str:
"""Return an informative string about which of the diagnostic
services are part of which UDS or OBD service group.
"""
result = StringIO()
for sid, service_list in self._service_groups.items():
if isinstance(sid, int):
sid_name = obd.sid_to_name(sid)
if sid_name is not None:
print(f"OBD service group '{sid_name}' (0x{sid:x}):", file=result)
elif (sid_name := uds.sid_to_name(sid)) is not None:
print(f"UDS service group '{sid_name}' (0x{sid:x}):", file=result)
else:
print(f"Unknown service group 0x{sid:x}:", file=result)
else:
print(f"Non-standard services:", file=result)
for service in service_list:
print(f" {service.short_name}", file=result)

return result.getvalue()

def __repr__(self) -> str:
"""Return an string representing the object
"""
result = StringIO()
result.write("[ ")
result.write(", ".join([f"0x{x}" for x in self._service_groups if x is not None]))
result.write(" ]")

return result.getvalue()

def __iter__(self) -> Iterator[Optional[int]]:
return iter(self._service_groups)

def __getitem__(self, sid: Optional[int]) -> NamedItemList[DiagService]:
if sid is None:
return self._service_groups.get(sid, NamedItemList())

if sid < 0 or sid > 255:
raise IndexError(f"SIDs must be in range 0x00 to 0xff")

return self._service_groups.get(sid, NamedItemList())
43 changes: 43 additions & 0 deletions odxtools/uds.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,53 @@ class UDSSID(IntEnum):
# 0xBA..0xBE: Reserved for ECU specific services


_sid_to_name = {
0x10: "Diagnostic Session Control",
0x11: "ECU Reset",
0x27: "Security Access",
0x28: "Communication Control",
0x29: "Authentication",
0x3e: "Tester Present",
0x83: "Access Timing Parameter",
0x84: "Secured Data Transmission",
0x85: "Control DTC Settings",
0x86: "Response on Event",
0x87: "Link Control",
0x22: "Read Data by Identifier ",
0x23: "Read Memory by Address",
0x24: "Read Scaling Data by Identifier",
0x2a: "Read Data by Periodic Identifier",
0x2c: "Dynamically Define Data Identifier",
0x2e: "Write Data by Identifier",
0x3d: "Write Memory by Address ",
0x14: "Clear Diagnostic Information",
0x19: "Read DTC Information",
0x2f: "Input Output Control by Identifier",
0x31: "Routine Control",
0x34: "Request Download",
0x35: "Request Upload",
0x36: "Transfer Data",
0x37: "Request Transfer Exit",
0x38: "Request File Transfer",
}

# add the OBD SIDs to the ones from UDS
SID = IntEnum("UdsSID", ((i.name, i.value) for i in chain(obd.SID, UDSSID))) # type: ignore[misc]


def sid_to_name(sid: int) -> Optional[str]:
if sid in _sid_to_name:
return _sid_to_name[sid]
elif 0x81 <= sid and sid <= 0x82:
return "KWP2000 Communication on K-Line"
elif 0xa0 <= sid and sid <= 0xb9:
return "OEM Specific"
elif 0xba <= sid and sid <= 0xbe:
return "ECU Specific"

return None


class NegativeResponseCodes(IntEnum):
"""The standardized negative response codes of UDS.
Expand Down
14 changes: 14 additions & 0 deletions tests/test_odxtools.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,20 @@ def test_find_service_by_name(self) -> None:
self.assertEqual(service.odx_id.local_id, "somersault.service.session_start")
self.assertEqual(service.semantic, "SESSION")

def test_udsbinner(self) -> None:
ecu = odxdb.ecus.somersault_lazy

service_groups = ecu.service_groups

self.assertEqual(len(service_groups._service_groups), 4)
self.assertEqual([s.short_name for s in service_groups[0x10]],
["session_start", "session_stop"])
self.assertEqual(service_groups[0x10].session_start.short_name, "session_start")
self.assertEqual([s.short_name for s in service_groups[0xba]], ["do_forward_flips"])
self.assertTrue(0x10 in service_groups)
self.assertTrue(0x42 not in service_groups)
self.assertEqual(service_groups[0x42], NamedItemList())


if __name__ == "__main__":
unittest.main()

0 comments on commit 128c323

Please sign in to comment.