Skip to content

Commit

Permalink
split find command into 'find' and 'decode' and restore service name …
Browse files Browse the repository at this point in the history
…search functionality without data for find
  • Loading branch information
floroks committed Oct 21, 2023
1 parent e453456 commit bc2a346
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 85 deletions.
129 changes: 129 additions & 0 deletions odxtools/cli/decode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# SPDX-License-Identifier: MIT
import argparse
from typing import Dict, List, Optional

from ..database import Database
from ..diagservice import DiagService
from ..exceptions import odxraise
from ..odxtypes import ParameterValue
from ..singleecujob import SingleEcuJob
from . import _parser_utils

# name of the tool
_odxtools_tool_name_ = "decode"


def get_display_value(v: ParameterValue) -> str:
if isinstance(v, bytes):
return v.hex(" ")
elif isinstance(v, int):
return f"{v} (0x{v:x})"
else:
return str(v)


def print_summary(
odxdb: Database,
ecu_variants: Optional[List[str]] = None,
data: bytes = b'',
decode: bool = False,
) -> None:
ecu_names = ecu_variants if ecu_variants else [ecu.short_name for ecu in odxdb.ecus]
services: Dict[DiagService, List[str]] = {}
for ecu_name in ecu_names:
ecu = odxdb.ecus[ecu_name]
if not ecu:
print(f"The ecu variant '{ecu_name}' could not be found!")
continue
if data:
found_services = ecu._find_services_for_uds(data)
for found_service in found_services:
ecu_names = services.get(found_service, [])
ecu_names.append(ecu_name)
services[found_service] = ecu_names

print(f"Binary data: {data.hex(' ')}")
for service, ecu_names in services.items():
if isinstance(service, DiagService):
print(
f"Decoded by service '{service.short_name}' (decoding ECUs: {', '.join(ecu_names)})"
)
elif isinstance(service, SingleEcuJob):
print(
f"Decoded by single ecu job '{service.short_name}' (decoding ECUs: {', '.join(ecu_names)})"
)
else:
print(f"Decoded by unknown diagnostic communication: '{service.short_name}' "
f"(decoding ECUs: {', '.join(ecu_names)})")

if decode:
if data is None:
odxraise("Data is required for decoding")

decoded = service.decode_message(data)
print(f"Decoded data:")
for param_name, param_value in decoded.param_dict.items():
print(f" {param_name}={get_display_value(param_value)}")


def add_subparser(subparsers: "argparse._SubParsersAction") -> None:
parser = subparsers.add_parser(
"decode",
description="\n".join([
"Decode request by hex-data",
"",
"Examples:",
" For displaying the service associated with the request 10 01 & decoding it:",
" odxtools decode ./path/to/database.pdx -D -d '10 01'",
" For displaying the service associated with the request 10 01, without decoding it:",
" odxtools decode ./path/to/database.pdx -d '10 01'",
" For more information use:",
" odxtools decode -h",
]),
help="Find & print service by hex-data. Can also decode the hex-data to into their named parameters.",
formatter_class=argparse.RawTextHelpFormatter,
)
_parser_utils.add_pdx_argument(parser)

parser.add_argument(
"-v",
"--variants",
nargs=1,
metavar="VARIANT",
required=False,
help="Specifies which ecu variants should be included.",
default="all",
)

parser.add_argument(
"-d",
"--data",
default=None,
metavar="DATA",
required=True,
help="Specify data of hex request",
)

parser.add_argument(
"-D",
"--decode",
action="store_true",
required=False,
help="Decode the given hex data",
)


def hex_to_binary(data_str: str) -> bytes:
return bytes.fromhex(data_str)


def run(args: argparse.Namespace) -> None:
odxdb = _parser_utils.load_file(args)
variants = args.variants

print_summary(
odxdb,
ecu_variants=None if variants == "all" else variants,
data=bytes.fromhex(args.data),
decode=args.decode,
)
90 changes: 23 additions & 67 deletions odxtools/cli/find.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

from ..database import Database
from ..diagservice import DiagService
from ..exceptions import odxraise
from ..odxtypes import ParameterValue
from ..singleecujob import SingleEcuJob
from . import _parser_utils
from ._print_utils import print_diagnostic_service

# name of the tool
_odxtools_tool_name_ = "find"
Expand All @@ -22,28 +22,18 @@ def get_display_value(v: ParameterValue) -> str:
return str(v)


def print_summary(
odxdb: Database,
ecu_variants: Optional[List[str]] = None,
data: bytes = b'',
service_names: Optional[List[str]] = None,
decode: bool = False,
print_params: bool = False,
allow_unknown_bit_lengths: bool = False,
) -> None:
def print_summary(odxdb: Database,
service_names: List[str],
ecu_variants: Optional[List[str]] = None,
allow_unknown_bit_lengths: bool = False,
print_params: bool = False) -> None:
ecu_names = ecu_variants if ecu_variants else [ecu.short_name for ecu in odxdb.ecus]
services: Dict[DiagService, List[str]] = {}
for ecu_name in ecu_names:
ecu = odxdb.ecus[ecu_name]
if not ecu:
print(f"The ecu variant '{ecu_name}' could not be found!")
continue
if data:
found_services = ecu._find_services_for_uds(data)
for found_service in found_services:
ecu_names = services.get(found_service, [])
ecu_names.append(ecu_name)
services[found_service] = ecu_names

if service_names:
for service_name_search in service_names:
Expand All @@ -53,28 +43,25 @@ def print_summary(
ecu_names.append(ecu_name)
services[service] = ecu_names

print(f"Binary data: {data.hex(' ')}")
for service, ecu_names in services.items():
display_names = ", ".join(ecu_names)
filler = str.ljust("", len(display_names), "=")
print(f"\n{filler}")
print(f"{', '.join(ecu_names)}")
print(f"{filler}\n\n")
if isinstance(service, DiagService):
print(
f"Decoded by service '{service.short_name}' (decoding ECUs: {', '.join(ecu_names)})"
print_diagnostic_service(
service,
print_params=print_params,
allow_unknown_bit_lengths=allow_unknown_bit_lengths,
print_pre_condition_states=True,
print_state_transitions=True,
print_audiences=True,
)
elif isinstance(service, SingleEcuJob):
print(
f"Decoded by single ecu job '{service.short_name}' (decoding ECUs: {', '.join(ecu_names)})"
)
print(f"SingleEcuJob: {service.odx_id}")
else:
print(f"Decoded by unknown diagnostic communication: '{service.short_name}' "
f"(decoding ECUs: {', '.join(ecu_names)})")

if decode:
if data is None:
odxraise("Data is required for decoding")

decoded = service.decode_message(data)
print(f"Decoded data:")
for param_name, param_value in decoded.param_dict.items():
print(f" {param_name}={get_display_value(param_value)}")
print(f"Unknown service: {service}")


def add_subparser(subparsers: "argparse._SubParsersAction") -> None:
Expand All @@ -84,10 +71,6 @@ def add_subparser(subparsers: "argparse._SubParsersAction") -> None:
"Find & print services by hex-data, or name, can also decodes requests",
"",
"Examples:",
" For displaying the service associated with the request 10 01:",
" odxtools find ./path/to/database.pdx -d 10 01",
" For displaying the service associated with the request 10 01, and decoding it:",
" odxtools find ./path/to/database.pdx -D 10 01",
" For displaying the services associated with the partial name 'Reset' without details:",
' odxtools find ./path/to/database.pdx -s "Reset" --no-details',
" For more information use:",
Expand All @@ -108,39 +91,20 @@ def add_subparser(subparsers: "argparse._SubParsersAction") -> None:
default="all",
)

parser.add_argument(
"-d",
"--data",
nargs=1,
default=None,
metavar="DATA",
required=True,
help="Print a list of diagnostic services associated with the hex request.",
)

parser.add_argument(
"-D",
"--decode",
default=False,
action="store_true",
required=False,
help="Decode the specified hex request.",
)

parser.add_argument(
"-s",
"--service-names",
nargs="*",
default=None,
metavar="SERVICES",
required=False,
required=True,
help="Print a list of diagnostic services partially matching given service names",
)

parser.add_argument(
"-nd",
"--no-details",
action="store_false",
action="store_true",
required=False,
help="Don't show all service details",
)
Expand All @@ -154,22 +118,14 @@ def add_subparser(subparsers: "argparse._SubParsersAction") -> None:
)


def hex_to_binary(data_str: str) -> bytes:
return bytes.fromhex(data_str)


def run(args: argparse.Namespace) -> None:
odxdb = _parser_utils.load_file(args)
variants = args.variants
data = bytes.fromhex(args.data[0])
decode = args.decode

print_summary(
odxdb,
ecu_variants=None if variants == "all" else variants,
data=data,
decode=decode,
service_names=args.service_names,
print_params=args.no_details,
print_params=not args.no_details,
allow_unknown_bit_lengths=args.relaxed_output,
)
2 changes: 1 addition & 1 deletion odxtools/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# import the tool modules which can be loaded. if a tool
# can't be loaded, add a dummy one
tool_modules: List[Any] = []
for tool_name in ["list", "browse", "snoop", "find"]:
for tool_name in ["list", "browse", "snoop", "find", "decode"]:
try:
tool_modules.append(importlib.import_module(f".{tool_name}", package="odxtools.cli"))
except Exception as e:
Expand Down
48 changes: 31 additions & 17 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from argparse import Namespace
from typing import List, Optional

import odxtools.cli.decode as decode
import odxtools.cli.find as find
import odxtools.cli.list as list_tool

Expand Down Expand Up @@ -37,21 +38,29 @@ def run_list_tool(path_to_pdx_file: str = "./examples/somersault.pdx",
list_tool.run(list_args)

@staticmethod
def run_find_tool(data: str,
def run_decode_tool(
data: str,
decode_data: bool = False,
path_to_pdx_file: str = "./examples/somersault.pdx",
ecu_variants: Optional[List[str]] = None,
) -> None:
decode_args = Namespace(
pdx_file=path_to_pdx_file, variants=ecu_variants, data=data, decode=decode_data)

decode.run(decode_args)

@staticmethod
def run_find_tool(service_names: List[str],
path_to_pdx_file: str = "./examples/somersault.pdx",
ecu_variants: Optional[List[str]] = None,
decode: bool = False,
ecu_services: Optional[List[str]] = None,
no_details: bool = True,
relaxed_output: bool = False) -> None:
allow_unknown_bit_lengths: bool = False,
no_details: bool = False) -> None:
find_args = Namespace(
pdx_file=path_to_pdx_file,
variants=ecu_variants,
data=[data],
decode=decode,
service_names=ecu_services,
no_details=no_details,
relaxed_output=relaxed_output)
service_names=service_names,
relaxed_output=allow_unknown_bit_lengths,
no_details=no_details)

find.run(find_args)

Expand All @@ -68,14 +77,19 @@ def test_list_tool(self) -> None:
UtilFunctions.run_list_tool(print_all=True)
UtilFunctions.run_list_tool(ecu_services=["session_start"])

def test_find_tool(self) -> None:
def test_decode_tool(self) -> None:

UtilFunctions.run_decode_tool(data="3E00")
UtilFunctions.run_decode_tool(data="3e00")
UtilFunctions.run_decode_tool(data="3E 00", decode_data=True)
UtilFunctions.run_decode_tool(data="3E 00", ecu_variants=["somersault_lazy"])
UtilFunctions.run_decode_tool(data="3E 00")

UtilFunctions.run_find_tool(data="3E00", ecu_services=["session_start"])
UtilFunctions.run_find_tool(data="3e00", ecu_services=["session_start"], no_details=False)
UtilFunctions.run_find_tool(data="3E 00")
UtilFunctions.run_find_tool(data="3E 00", ecu_variants=["somersault_lazy"])
UtilFunctions.run_find_tool(data="3E 00", decode=True)
UtilFunctions.run_find_tool(data="3E 00", decode=True, relaxed_output=True)
def test_find_tool(self) -> None:
UtilFunctions.run_find_tool(service_names=["headstand"])
UtilFunctions.run_find_tool(service_names=["headstand"], allow_unknown_bit_lengths=True)
UtilFunctions.run_find_tool(
service_names=["headstand"], allow_unknown_bit_lengths=True, no_details=True)

@unittest.skipIf(import_failed, "import of PyInquirer failed")
def test_browse_tool(self) -> None:
Expand Down

0 comments on commit bc2a346

Please sign in to comment.