Skip to content

Commit

Permalink
feat: Rework CLI argument parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
rumpelsepp committed Jul 6, 2022
1 parent 44e9612 commit fe5d6c5
Show file tree
Hide file tree
Showing 39 changed files with 399 additions and 235 deletions.
44 changes: 22 additions & 22 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,48 +68,48 @@ jobs:
- name: Spawn vECU
run: |
poetry run gallia vecu "tcp-lines://127.0.0.1:20162" rng --seed 3 --mandatory_sessions "[1, 2, 3]" --mandatory_services "[DiagnosticSessionControl, EcuReset, ReadDataByIdentifier, WriteDataByIdentifier, RoutineControl, SecurityAccess, ReadMemoryByAddress, WriteMemoryByAddress, RequestDownload, RequestUpload, TesterPresent, ReadDTCInformation, ClearDiagnosticInformation, InputOutputControlByIdentifier]" &
poetry run gallia serve vecu "tcp-lines://127.0.0.1:20162" rng --seed 3 --mandatory_sessions "[1, 2, 3]" --mandatory_services "[DiagnosticSessionControl, EcuReset, ReadDataByIdentifier, WriteDataByIdentifier, RoutineControl, SecurityAccess, ReadMemoryByAddress, WriteMemoryByAddress, RequestDownload, RequestUpload, TesterPresent, ReadDTCInformation, ClearDiagnosticInformation, InputOutputControlByIdentifier]" &
- name: Test scan-services
run: |
poetry run gallia scan-services --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap --sessions 1 2 --check-session
poetry run gallia scan uds services --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap --sessions 1 2 --check-session
- name: Test scan-sessions
run: |
poetry run gallia scan-sessions --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap --depth 2
poetry run gallia scan-sessions --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap --fast
poetry run gallia scan uds sessions --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap --depth 2
poetry run gallia scan uds sessions --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap --fast
- name: Test scan-identifiers
run: |
poetry run gallia scan-identifiers --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap --start 0 --end 100 --sid 0x22
poetry run gallia scan-identifiers --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap --start 0 --end 100 --sid 0x2e
poetry run gallia scan-identifiers --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap --start 0 --end 100 --sid 0x31
poetry run gallia scan uds identifiers --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap --start 0 --end 100 --sid 0x22
poetry run gallia sca uds -identifiers --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap --start 0 --end 100 --sid 0x2e
poetry run gallia sca uds -identifiers --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap --start 0 --end 100 --sid 0x31
- name: Test scan-reset
run: |
poetry run gallia scan-reset --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap
poetry run gallia scan uds reset --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap
- name: Test scan-dump-seeds
run: |
poetry run gallia scan-dump-seeds --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap --duration 0.01 --level 0x2f
poetry run gallia scan uds scan dump-seeds --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap --duration 0.01 --level 0x2f
- name: Test scan-memory-functions
run: |
for sid in 0x23 0x34 0x35 0x3d; do
poetry run gallia scan-memory-functions --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap --sid "$sid"
poetry run gallia scan uds memory --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap --sid "$sid"
done
- name: Test UDS primitives
run: |
poetry run gallia simple-ecu-reset --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap
poetry run gallia simple-read-error-log --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap
poetry run gallia simple-get-vin --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap
poetry run gallia simple-ping --ecu-reset --count 2 --target "tcp-lines://127.0.0.1:20162" --no-dumpcap
poetry run gallia simple-read-by-identifier --ecu-reset --data-id 0x108d --target "tcp-lines://127.0.0.1:20162" --no-dumpcap
poetry run gallia simple-send-pdu 1001 --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap
poetry run gallia simple-write-by-identifier --ecu-reset --data-id 0x2266 --data 00 --target "tcp-lines://127.0.0.1:20162" --no-dumpcap
poetry run gallia simple-dtc --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap read
poetry run gallia simple-dtc --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap clear
poetry run gallia simple-dtc --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap control --stop
poetry run gallia simple-dtc --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap control --resume
poetry run gallia simple-iocbi --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap 0x1000 reset-to-default
poetry run gallia prims uds ecu-reset --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap
poetry run gallia prims uds error-log --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap
poetry run gallia prims uds vin --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap
poetry run gallia prims uds ping --ecu-reset --count 2 --target "tcp-lines://127.0.0.1:20162" --no-dumpcap
poetry run gallia prims uds rdbid --ecu-reset --data-id 0x108d --target "tcp-lines://127.0.0.1:20162" --no-dumpcap
poetry run gallia prims uds send-pdu 1001 --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap
poetry run gallia prims uds rdbid --ecu-reset --data-id 0x2266 --data 00 --target "tcp-lines://127.0.0.1:20162" --no-dumpcap
poetry run gallia prims uds dtc --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap read
poetry run gallia prims uds dtc --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap clear
poetry run gallia prims uds dtc --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap control --stop
poetry run gallia prims uds dtc --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap control --resume
poetry run gallia prims uds IOCBI --ecu-reset --target "tcp-lines://127.0.0.1:20162" --no-dumpcap 0x1000 reset-to-default
25 changes: 0 additions & 25 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,31 +58,6 @@ reuse = "^1.0.0"
"discover-xcp" = "gallia.udscan.scanner.find_xcp:main"
"cursed-hr" = "cursed_hr.cursed_hr:main"

[tool.poetry.plugins."gallia_scanners"]
"discover-can-ids" = "gallia.udscan.scanner.find_can_ids:FindCanIDsScanner"
"discover-doip" = "gallia.udscan.scanner.discover_doip:DiscoverDoIP"
"fuzz-payloads" = "gallia.udscan.scanner.fuzz_payloads:FuzzPayloads"
"scan-dump-seeds" = "gallia.udscan.scanner.scan_sa_dump_seeds:SaDumpSeeds"
"scan-identifiers" = "gallia.udscan.scanner.scan_identifiers:ScanIdentifiers"
"scan-memory-functions" = "gallia.udscan.scanner.scan_memory_functions:ScanWriteDataByAddress"
"scan-reset" = "gallia.udscan.scanner.scan_reset:ScanReset"
"scan-services" = "gallia.udscan.scanner.scan_services:ScanServices"
"scan-sessions" = "gallia.udscan.scanner.scan_sessions:IterateSessions"
"simple-dtc" = "gallia.udscan.scanner.simple_dtc:DTCScanner"
"simple-ecu-reset" = "gallia.udscan.scanner.simple_ecu_reset:EcuReset"
"simple-get-vin" = "gallia.udscan.scanner.simple_get_vin:GetVin"
"simple-iocbi" = "gallia.udscan.scanner.simple_iocbi:IOCBI"
"simple-ping" = "gallia.udscan.scanner.simple_ping:Ping"
"simple-read-by-identifier" = "gallia.udscan.scanner.simple_read_by_identifier:ReadByIdentifier"
"simple-read-error-log" = "gallia.udscan.scanner.simple_read_error_log:ReadErrorLog"
"simple-rmba" = "gallia.udscan.scanner.simple_rmba:ReadMemoryByAddressScanner"
"simple-rtcl" = "gallia.udscan.scanner.simple_rtcl:RTCL"
"simple-send-pdu" = "gallia.udscan.scanner.simple_send_pdu:SendPDU"
"simple-test-xcp" = "gallia.udscan.scanner.simple_test_xcp:TestXCP"
"simple-wmba" = "gallia.udscan.scanner.simple_wmba:WriteMemoryByAddressScanner"
"simple-write-by-identifier" = "gallia.udscan.scanner.simple_write_by_identifier:WriteByIdentifier"
"vecu" = "gallia.virtual_ecu:VirtualECU"

[tool.mypy]
disallow_any_unimported = true
check_untyped_defs = true
Expand Down
237 changes: 167 additions & 70 deletions src/gallia/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,86 +2,183 @@
#
# SPDX-License-Identifier: Apache-2.0

# PYTHON_ARGCOMPLETE_OK

import argparse
import os
import sys
from argparse import ArgumentDefaultsHelpFormatter
from importlib.metadata import entry_points
from typing import Any

import argcomplete

from gallia.command import GalliaBase
from gallia.commands.discover.uds.find_can_ids import FindCanIDsScanner
from gallia.commands.prims.uds.simple_dtc import SimpleDTC
from gallia.commands.prims.uds.simple_ecu_reset import SimpleECUReset
from gallia.commands.prims.uds.simple_get_vin import SimpleGetVin
from gallia.commands.prims.uds.simple_iocbi import SimpleIOCBI
from gallia.commands.prims.uds.simple_ping import SimplePing
from gallia.commands.prims.uds.simple_read_by_identifier import ReadByIdentifier
from gallia.commands.prims.uds.simple_read_error_log import SimpleReadErrorLog
from gallia.commands.prims.uds.simple_rmba import SimpleRMBA
from gallia.commands.prims.uds.simple_rtcl import SimpleRTCL
from gallia.commands.prims.uds.simple_send_pdu import SimpleSendPDU
from gallia.commands.prims.uds.simple_wmba import SimpleWMBA
from gallia.commands.prims.uds.simple_write_by_identifier import WriteByIdentifier
from gallia.commands.scan.uds.scan_identifiers import ScanIdentifiers
from gallia.commands.scan.uds.scan_memory_functions import MemoryFunctionsScanner
from gallia.commands.scan.uds.scan_reset import ResetScanner
from gallia.commands.scan.uds.scan_sa_dump_seeds import SaDumpSeeds
from gallia.commands.scan.uds.scan_services import ServicesScanner
from gallia.commands.scan.uds.scan_sessions import ScanSessions
from gallia.commands.scan.uds.fuzz_payloads import FuzzPayloads
from gallia.commands.serve.virtual_ecu import VirtualECU

# from gallia.commands.prims.uds.simple_test_xcp import SimpleTestXCP

registry: list[type[GalliaBase]] = [
# SimpleTestXCP,
FindCanIDsScanner,
FuzzPayloads,
MemoryFunctionsScanner,
ReadByIdentifier,
ResetScanner,
SaDumpSeeds,
ScanIdentifiers,
ScanSessions,
ServicesScanner,
SimpleDTC,
SimpleECUReset,
SimpleGetVin,
SimpleIOCBI,
SimplePing,
SimpleRMBA,
SimpleRTCL,
SimpleReadErrorLog,
SimpleSendPDU,
SimpleWMBA,
VirtualECU,
WriteByIdentifier,
]


def load_plugins() -> None:
eps = entry_points()
if "gallia_commands" in eps:
for entry in eps["gallia_commands"]:
registry.append(entry.load())


def _add_cmd_category(
d: dict[str, Any],
sp: argparse._SubParsersAction,
category: str,
help_str: str,
) -> None:
parser = sp.add_parser(category, help=help_str)
parser.set_defaults(usage_func=parser.print_usage)

d[category] = {}
d[category]["siblings"] = {}
d[category]["parser"] = parser
d[category]["subparsers"] = parser.add_subparsers(metavar="COMMAND")


def build_parsers() -> dict[str, Any]:
parser = argparse.ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.set_defaults(usage_func=parser.print_usage)
subparsers = parser.add_subparsers(metavar="COMMAND", required=True)
parsers: dict[str, Any] = {
"parser": parser,
"subparsers": subparsers,
"siblings": {},
}

_add_cmd_category(
parsers,
subparsers,
"discover",
"find hosts and endpoints",
)
_add_cmd_category(
parsers["discover"]["siblings"],
parsers["discover"]["subparsers"],
"uds",
"Universal Diagnostic Services",
)
_add_cmd_category(
parsers["discover"]["siblings"],
parsers["discover"]["subparsers"],
"xcp",
"Universal Measurement and Calibration Protocol",
)

def main() -> None:
parser = argparse.ArgumentParser()
sp = parser.add_subparsers(
metavar="CHOICE",
_add_cmd_category(
parsers,
subparsers,
"prims",
"protocol specific primitives",
)
_add_cmd_category(
parsers["prims"]["siblings"],
parsers["prims"]["subparsers"],
"uds",
"Universal Diagnostic Services",
)

_add_cmd_category(
parsers,
subparsers,
"scan",
"scan parameters of various network protocols",
)
_add_cmd_category(
parsers["scan"]["siblings"],
parsers["scan"]["subparsers"],
"uds",
"Universal Diagnostic Services",
)

all_entries = entry_points()
_add_cmd_category(
parsers,
subparsers,
"serve",
"utilities to spawn services",
)

if "_ARGCOMPLETE" in os.environ:
comp_line = os.environ["COMP_LINE"]
comp_point = int(os.environ["COMP_POINT"])
return parsers

# For completion only the names are required, which results in a noticeably faster response
# compared to the full loading process, which is necessary for the help
for entry in all_entries["gallia_scanners"]:
_parser = sp.add_parser(entry.name)

if len(comp_line.lstrip().split(" ")) < 3:
argcomplete.autocomplete(parser)
def load_plugin_cli(parsers: dict[str, Any]) -> None:
for cls in registry:
if cls.CATEGORY is not None:
if cls.SUBCATEGORY is not None:
subparsers = parsers[cls.CATEGORY]["siblings"][cls.SUBCATEGORY][
"subparsers"
]
else:
subparsers = parsers[cls.CATEGORY]["subparsers"]
else:
entry_prefix_length = comp_line.find(" ") + 1

os.environ["COMP_LINE"] = comp_line[entry_prefix_length:]
os.environ["COMP_POINT"] = str(comp_point - entry_prefix_length)

chosen_entry = comp_line.split()[1]

try:
entry_point = next(
i.load()
for i in all_entries["gallia_scanners"]
if i.name == chosen_entry
)
sys.exit(entry_point().run())
except StopIteration:
pass
else:
if "gallia_scanners" in all_entries:
for entry in all_entries["gallia_scanners"]:
scanner_class = entry.load()

_parser = sp.add_parser(
entry.name,
help=scanner_class.__doc__,
)
_parser.set_defaults(func=scanner_class().run)

if "gallia_scripts" in all_entries:
for entry in all_entries["gallia_scripts"]:
entry_point = entry.load()

_parser = sp.add_parser(
entry.name,
help=entry_point.__doc__,
)
_parser.set_defaults(func=entry_point)

# Only pass a single argument to the args parser, otherwise it gets confused
# with arguments belonging to one of the subparsers.
if len(sys.argv) < 2:
parser.print_help()
parser.exit()

args = parser.parse_args([sys.argv[1]])

# Combine the first two arguments for subsequent parsers to work correctly
sys.argv[0] += f" {sys.argv[1]}"
sys.argv.pop(1)

if not hasattr(args, "func"):
parser.print_help()
parser.exit()
sys.exit(args.func())
subparsers = parsers["subparsers"]

subparser = subparsers.add_parser(
cls.COMMAND,
description=cls.__doc__,
help=cls.SHORT_HELP,
)
scanner = cls(subparser)
subparser.set_defaults(run_func=scanner.run)


def main() -> None:
load_plugins()
parsers = build_parsers()
load_plugin_cli(parsers)

parser = parsers["parser"]
argcomplete.autocomplete(parser)
args = parser.parse_args()
if not hasattr(args, "run_func"):
args.usage_func()
parser.exit(1)

sys.exit(args.func(args))
Loading

0 comments on commit fe5d6c5

Please sign in to comment.