diff --git a/odxtools/cli/decode.py b/odxtools/cli/decode.py new file mode 100644 index 00000000..2f6bd200 --- /dev/null +++ b/odxtools/cli/decode.py @@ -0,0 +1,129 @@ +# SPDX-License-Identifier: MIT +import argparse +from typing import Dict, List, Optional + +from ..database import Database +from ..diagservice import DiagService +from ..exceptions import odxraise +from ..odxtypes import ParameterValue +from ..singleecujob import SingleEcuJob +from . import _parser_utils + +# name of the tool +_odxtools_tool_name_ = "decode" + + +def get_display_value(v: ParameterValue) -> str: + if isinstance(v, bytes): + return v.hex(" ") + elif isinstance(v, int): + return f"{v} (0x{v:x})" + else: + return str(v) + + +def print_summary( + odxdb: Database, + ecu_variants: Optional[List[str]] = None, + data: bytes = b'', + decode: bool = False, +) -> None: + ecu_names = ecu_variants if ecu_variants else [ecu.short_name for ecu in odxdb.ecus] + services: Dict[DiagService, List[str]] = {} + for ecu_name in ecu_names: + ecu = odxdb.ecus[ecu_name] + if not ecu: + print(f"The ecu variant '{ecu_name}' could not be found!") + continue + if data: + found_services = ecu._find_services_for_uds(data) + for found_service in found_services: + ecu_names = services.get(found_service, []) + ecu_names.append(ecu_name) + services[found_service] = ecu_names + + print(f"Binary data: {data.hex(' ')}") + for service, ecu_names in services.items(): + if isinstance(service, DiagService): + print( + f"Decoded by service '{service.short_name}' (decoding ECUs: {', '.join(ecu_names)})" + ) + elif isinstance(service, SingleEcuJob): + print( + f"Decoded by single ecu job '{service.short_name}' (decoding ECUs: {', '.join(ecu_names)})" + ) + else: + print(f"Decoded by unknown diagnostic communication: '{service.short_name}' " + f"(decoding ECUs: {', '.join(ecu_names)})") + + if decode: + if data is None: + odxraise("Data is required for decoding") + + decoded = service.decode_message(data) + print(f"Decoded data:") + for param_name, param_value in decoded.param_dict.items(): + print(f" {param_name}={get_display_value(param_value)}") + + +def add_subparser(subparsers: "argparse._SubParsersAction") -> None: + parser = subparsers.add_parser( + "decode", + description="\n".join([ + "Decode request by hex-data", + "", + "Examples:", + " For displaying the service associated with the request 10 01 & decoding it:", + " odxtools decode ./path/to/database.pdx -D -d '10 01'", + " For displaying the service associated with the request 10 01, without decoding it:", + " odxtools decode ./path/to/database.pdx -d '10 01'", + " For more information use:", + " odxtools decode -h", + ]), + help="Find & print service by hex-data. Can also decode the hex-data to into their named parameters.", + formatter_class=argparse.RawTextHelpFormatter, + ) + _parser_utils.add_pdx_argument(parser) + + parser.add_argument( + "-v", + "--variants", + nargs=1, + metavar="VARIANT", + required=False, + help="Specifies which ecu variants should be included.", + default="all", + ) + + parser.add_argument( + "-d", + "--data", + default=None, + metavar="DATA", + required=True, + help="Specify data of hex request", + ) + + parser.add_argument( + "-D", + "--decode", + action="store_true", + required=False, + help="Decode the given hex data", + ) + + +def hex_to_binary(data_str: str) -> bytes: + return bytes.fromhex(data_str) + + +def run(args: argparse.Namespace) -> None: + odxdb = _parser_utils.load_file(args) + variants = args.variants + + print_summary( + odxdb, + ecu_variants=None if variants == "all" else variants, + data=bytes.fromhex(args.data), + decode=args.decode, + ) diff --git a/odxtools/cli/find.py b/odxtools/cli/find.py index 6e13cb2f..384228ec 100644 --- a/odxtools/cli/find.py +++ b/odxtools/cli/find.py @@ -4,10 +4,10 @@ from ..database import Database from ..diagservice import DiagService -from ..exceptions import odxraise from ..odxtypes import ParameterValue from ..singleecujob import SingleEcuJob from . import _parser_utils +from ._print_utils import print_diagnostic_service # name of the tool _odxtools_tool_name_ = "find" @@ -22,15 +22,11 @@ def get_display_value(v: ParameterValue) -> str: return str(v) -def print_summary( - odxdb: Database, - ecu_variants: Optional[List[str]] = None, - data: bytes = b'', - service_names: Optional[List[str]] = None, - decode: bool = False, - print_params: bool = False, - allow_unknown_bit_lengths: bool = False, -) -> None: +def print_summary(odxdb: Database, + service_names: List[str], + ecu_variants: Optional[List[str]] = None, + allow_unknown_bit_lengths: bool = False, + print_params: bool = False) -> None: ecu_names = ecu_variants if ecu_variants else [ecu.short_name for ecu in odxdb.ecus] services: Dict[DiagService, List[str]] = {} for ecu_name in ecu_names: @@ -38,12 +34,6 @@ def print_summary( if not ecu: print(f"The ecu variant '{ecu_name}' could not be found!") continue - if data: - found_services = ecu._find_services_for_uds(data) - for found_service in found_services: - ecu_names = services.get(found_service, []) - ecu_names.append(ecu_name) - services[found_service] = ecu_names if service_names: for service_name_search in service_names: @@ -53,28 +43,25 @@ def print_summary( ecu_names.append(ecu_name) services[service] = ecu_names - print(f"Binary data: {data.hex(' ')}") for service, ecu_names in services.items(): + display_names = ", ".join(ecu_names) + filler = str.ljust("", len(display_names), "=") + print(f"\n{filler}") + print(f"{', '.join(ecu_names)}") + print(f"{filler}\n\n") if isinstance(service, DiagService): - print( - f"Decoded by service '{service.short_name}' (decoding ECUs: {', '.join(ecu_names)})" + print_diagnostic_service( + service, + print_params=print_params, + allow_unknown_bit_lengths=allow_unknown_bit_lengths, + print_pre_condition_states=True, + print_state_transitions=True, + print_audiences=True, ) elif isinstance(service, SingleEcuJob): - print( - f"Decoded by single ecu job '{service.short_name}' (decoding ECUs: {', '.join(ecu_names)})" - ) + print(f"SingleEcuJob: {service.odx_id}") else: - print(f"Decoded by unknown diagnostic communication: '{service.short_name}' " - f"(decoding ECUs: {', '.join(ecu_names)})") - - if decode: - if data is None: - odxraise("Data is required for decoding") - - decoded = service.decode_message(data) - print(f"Decoded data:") - for param_name, param_value in decoded.param_dict.items(): - print(f" {param_name}={get_display_value(param_value)}") + print(f"Unknown service: {service}") def add_subparser(subparsers: "argparse._SubParsersAction") -> None: @@ -84,10 +71,6 @@ def add_subparser(subparsers: "argparse._SubParsersAction") -> None: "Find & print services by hex-data, or name, can also decodes requests", "", "Examples:", - " For displaying the service associated with the request 10 01:", - " odxtools find ./path/to/database.pdx -d 10 01", - " For displaying the service associated with the request 10 01, and decoding it:", - " odxtools find ./path/to/database.pdx -D 10 01", " For displaying the services associated with the partial name 'Reset' without details:", ' odxtools find ./path/to/database.pdx -s "Reset" --no-details', " For more information use:", @@ -108,39 +91,20 @@ def add_subparser(subparsers: "argparse._SubParsersAction") -> None: default="all", ) - parser.add_argument( - "-d", - "--data", - nargs=1, - default=None, - metavar="DATA", - required=True, - help="Print a list of diagnostic services associated with the hex request.", - ) - - parser.add_argument( - "-D", - "--decode", - default=False, - action="store_true", - required=False, - help="Decode the specified hex request.", - ) - parser.add_argument( "-s", "--service-names", nargs="*", default=None, metavar="SERVICES", - required=False, + required=True, help="Print a list of diagnostic services partially matching given service names", ) parser.add_argument( "-nd", "--no-details", - action="store_false", + action="store_true", required=False, help="Don't show all service details", ) @@ -154,22 +118,14 @@ def add_subparser(subparsers: "argparse._SubParsersAction") -> None: ) -def hex_to_binary(data_str: str) -> bytes: - return bytes.fromhex(data_str) - - def run(args: argparse.Namespace) -> None: odxdb = _parser_utils.load_file(args) variants = args.variants - data = bytes.fromhex(args.data[0]) - decode = args.decode print_summary( odxdb, ecu_variants=None if variants == "all" else variants, - data=data, - decode=decode, service_names=args.service_names, - print_params=args.no_details, + print_params=not args.no_details, allow_unknown_bit_lengths=args.relaxed_output, ) diff --git a/odxtools/cli/main.py b/odxtools/cli/main.py index 7d6dfba8..caf35c93 100644 --- a/odxtools/cli/main.py +++ b/odxtools/cli/main.py @@ -9,7 +9,7 @@ # import the tool modules which can be loaded. if a tool # can't be loaded, add a dummy one tool_modules: List[Any] = [] -for tool_name in ["list", "browse", "snoop", "find"]: +for tool_name in ["list", "browse", "snoop", "find", "decode"]: try: tool_modules.append(importlib.import_module(f".{tool_name}", package="odxtools.cli")) except Exception as e: diff --git a/tests/test_cli.py b/tests/test_cli.py index 3e85a60f..8bd7ba62 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -4,6 +4,7 @@ from argparse import Namespace from typing import List, Optional +import odxtools.cli.decode as decode import odxtools.cli.find as find import odxtools.cli.list as list_tool @@ -37,21 +38,29 @@ def run_list_tool(path_to_pdx_file: str = "./examples/somersault.pdx", list_tool.run(list_args) @staticmethod - def run_find_tool(data: str, + def run_decode_tool( + data: str, + decode_data: bool = False, + path_to_pdx_file: str = "./examples/somersault.pdx", + ecu_variants: Optional[List[str]] = None, + ) -> None: + decode_args = Namespace( + pdx_file=path_to_pdx_file, variants=ecu_variants, data=data, decode=decode_data) + + decode.run(decode_args) + + @staticmethod + def run_find_tool(service_names: List[str], path_to_pdx_file: str = "./examples/somersault.pdx", ecu_variants: Optional[List[str]] = None, - decode: bool = False, - ecu_services: Optional[List[str]] = None, - no_details: bool = True, - relaxed_output: bool = False) -> None: + allow_unknown_bit_lengths: bool = False, + no_details: bool = False) -> None: find_args = Namespace( pdx_file=path_to_pdx_file, variants=ecu_variants, - data=[data], - decode=decode, - service_names=ecu_services, - no_details=no_details, - relaxed_output=relaxed_output) + service_names=service_names, + relaxed_output=allow_unknown_bit_lengths, + no_details=no_details) find.run(find_args) @@ -68,14 +77,19 @@ def test_list_tool(self) -> None: UtilFunctions.run_list_tool(print_all=True) UtilFunctions.run_list_tool(ecu_services=["session_start"]) - def test_find_tool(self) -> None: + def test_decode_tool(self) -> None: + + UtilFunctions.run_decode_tool(data="3E00") + UtilFunctions.run_decode_tool(data="3e00") + UtilFunctions.run_decode_tool(data="3E 00", decode_data=True) + UtilFunctions.run_decode_tool(data="3E 00", ecu_variants=["somersault_lazy"]) + UtilFunctions.run_decode_tool(data="3E 00") - UtilFunctions.run_find_tool(data="3E00", ecu_services=["session_start"]) - UtilFunctions.run_find_tool(data="3e00", ecu_services=["session_start"], no_details=False) - UtilFunctions.run_find_tool(data="3E 00") - UtilFunctions.run_find_tool(data="3E 00", ecu_variants=["somersault_lazy"]) - UtilFunctions.run_find_tool(data="3E 00", decode=True) - UtilFunctions.run_find_tool(data="3E 00", decode=True, relaxed_output=True) + def test_find_tool(self) -> None: + UtilFunctions.run_find_tool(service_names=["headstand"]) + UtilFunctions.run_find_tool(service_names=["headstand"], allow_unknown_bit_lengths=True) + UtilFunctions.run_find_tool( + service_names=["headstand"], allow_unknown_bit_lengths=True, no_details=True) @unittest.skipIf(import_failed, "import of PyInquirer failed") def test_browse_tool(self) -> None: