diff --git a/src/gallia/command/base.py b/src/gallia/command/base.py index 6a406688d..40d475e09 100644 --- a/src/gallia/command/base.py +++ b/src/gallia/command/base.py @@ -115,6 +115,7 @@ class BaseCommandConfig(GalliaBaseModel, cli_group="generic", config_section="ga model_config = ConfigDict(arbitrary_types_allowed=True) verbose: int = Field(0, description="increase verbosity on the console", short="v") + progress: bool = Field(True, description="Show a progress bar at the bottom of the terminal") trace_log: bool = Field(False, description="set the loglevel of the logfile to TRACE") pre_hook: str | None = Field( None, diff --git a/src/gallia/commands/scan/uds/services.py b/src/gallia/commands/scan/uds/services.py index 40d503fe5..5ed445236 100644 --- a/src/gallia/commands/scan/uds/services.py +++ b/src/gallia/commands/scan/uds/services.py @@ -18,6 +18,7 @@ ) from gallia.services.uds.core.exception import MalformedResponse, UDSException from gallia.services.uds.core.utils import g_repr +from gallia.progress import progress_bar, ProgressSetter logger = get_logger(__name__) @@ -58,7 +59,7 @@ async def main(self) -> None: found: dict[int, dict[int, Any]] = {} if self.config.sessions is None: - found[0] = await self.perform_scan() + found[0] = await self.perform_scan_wrapper(session=None) else: sessions = [ s @@ -89,7 +90,7 @@ async def main(self) -> None: logger.result(f"scanning in session {g_repr(session)}") - found[session] = await self.perform_scan(session) + found[session] = await self.perform_scan_wrapper(session) await self.ecu.leave_session(session, sleep=self.config.power_cycle_sleep) @@ -102,13 +103,10 @@ async def main(self) -> None: except Exception: logger.result(f" [{g_repr(sid)}] vendor specific sid: {data}") - async def perform_scan(self, session: None | int = None) -> dict[int, Any]: + async def perform_scan(self, session: None | int, progress: ProgressSetter) -> dict[int, Any]: result: dict[int, Any] = {} - # Starts at 0x00, see first loop iteration. - sid = -1 - while sid < 0xFF: - sid += 1 + for sid in range(0xFF): if sid & 0x40 and (not self.config.scan_response_ids): continue @@ -118,6 +116,8 @@ async def perform_scan(self, session: None | int = None) -> dict[int, Any]: logger.info(f"{g_repr(sid)}: skipped") continue + progress(sid, 0xff, "scanning services") + if session is not None and self.config.check_session: if not await self.ecu.check_and_set_session(session): logger.error( @@ -140,7 +140,7 @@ async def perform_scan(self, session: None | int = None) -> dict[int, Any]: UDSErrorCodes.serviceNotSupported, UDSErrorCodes.serviceNotSupportedInActiveSession, ]: - logger.info(f"{g_repr(sid)}: not supported [{resp}]") + logger.debug(f"{g_repr(sid)}: not supported [{resp}]") break if isinstance(resp, NegativeResponse) and resp.response_code in [ @@ -153,3 +153,8 @@ async def perform_scan(self, session: None | int = None) -> dict[int, Any]: break return result + + async def perform_scan_wrapper(self, session: None | int) -> dict[int, Any]: + with progress_bar() as p: + return await self.perform_scan(session, p) + diff --git a/src/gallia/progress.py b/src/gallia/progress.py new file mode 100644 index 000000000..8a7694661 --- /dev/null +++ b/src/gallia/progress.py @@ -0,0 +1,91 @@ +# SPDX-FileCopyrightText: AISEC Pentesting Team +# +# SPDX-License-Identifier: Apache-2.0 + +# Adopted from: https://mdk.fr/blog/how-apt-does-its-fancy-progress-bar.html + +import functools +import os +import shutil +from collections.abc import Callable, Iterator +from contextlib import contextmanager + +eprint = functools.partial(print, end="", flush=True) + + +def save_cursor_position() -> None: + eprint("\0337") + + +def restore_cursor_position() -> None: + eprint("\0338") + + +def lock_footer() -> None: + _, lines = shutil.get_terminal_size() + eprint(f"\033[0;{lines-1}r") + + +def unlock_footer() -> None: + _, lines = shutil.get_terminal_size() + eprint(f"\033[0;{lines}r") + + +def move_to_footer() -> None: + _, lines = shutil.get_terminal_size() + eprint(f"\033[{lines};0f") + + +def move_cursor_up() -> None: + eprint("\033[1A") + + +def erase_line() -> None: + eprint("\033[2K") + + +def footer_init() -> None: + # Ensure the last line is available. + eprint("\n") + save_cursor_position() + lock_footer() + restore_cursor_position() + move_cursor_up() + + +def footer_deinit() -> None: + save_cursor_position() + unlock_footer() + move_to_footer() + erase_line() + restore_cursor_position() + + +type ProgressSetter = Callable[[int, int, str], None] + + +@contextmanager +def progress_bar(bar_len: int = 30) -> Iterator[ProgressSetter]: + def set_progress(count: int, total: int, suffix: str = "") -> None: + cols, _ = shutil.get_terminal_size() + filled_len = int(round(bar_len * count / float(total))) + + percents = round(100.0 * count / float(total), 1) + bar = "█" * filled_len + "░" * (bar_len - filled_len) + + content = f"{bar} {percents}% {suffix}" + if len(content) > cols: + content = content[: cols - 1] + "…" + + + save_cursor_position() + move_to_footer() + erase_line() + eprint(content) + restore_cursor_position() + + footer_init() + try: + yield set_progress + finally: + footer_deinit() diff --git a/uv.lock b/uv.lock index 3d45da1c2..ff42f4b05 100644 --- a/uv.lock +++ b/uv.lock @@ -287,7 +287,7 @@ dependencies = [ { name = "zstandard" }, ] -[package.dev-dependencies] +[package.optional-dependencies] dev = [ { name = "construct-typing" }, { name = "mypy" }, @@ -313,31 +313,27 @@ requires-dist = [ { name = "argcomplete", specifier = ">=2,<4" }, { name = "boltons", specifier = ">=24.1.0" }, { name = "construct", specifier = ">=2.10,<3.0" }, + { name = "construct-typing", marker = "extra == 'dev'", specifier = ">=0.5.2,<0.7.0" }, + { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.0,<2.0" }, + { name = "myst-parser", marker = "extra == 'dev'", specifier = ">=3.0.1,<4.1" }, { name = "platformdirs", specifier = ">=2.6,<5.0" }, { name = "pydantic", specifier = ">=2.0,<3.0" }, + { name = "pylsp-mypy", marker = "extra == 'dev'", specifier = ">=0.6,<0.7" }, + { name = "pylsp-rope", marker = "extra == 'dev'", specifier = ">=0.1,<0.2" }, + { name = "pytest", marker = "extra == 'dev'", specifier = ">=7.1,<9.0" }, + { name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=0.20,<0.25" }, + { name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=4,<6" }, + { name = "python-lsp-server", marker = "extra == 'dev'", specifier = ">=1.5,<2.0" }, + { name = "reuse", marker = "extra == 'dev'", specifier = ">=4.0,<5.0" }, + { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.8.0" }, + { name = "sphinx", marker = "extra == 'dev'", specifier = ">=8.0" }, + { name = "sphinx-rtd-theme", marker = "extra == 'dev'", specifier = ">=3" }, { name = "tabulate", specifier = ">=0.9" }, + { name = "types-aiofiles", marker = "extra == 'dev'", specifier = ">=23.1,<25.0" }, + { name = "types-tabulate", marker = "extra == 'dev'", specifier = ">=0.9,<0.10" }, { name = "zstandard", specifier = ">=0.19" }, ] -[package.metadata.requires-dev] -dev = [ - { name = "construct-typing", specifier = ">=0.5.2,<0.7.0" }, - { name = "mypy", specifier = ">=1.0,<2.0" }, - { name = "myst-parser", specifier = ">=3.0.1,<4.1" }, - { name = "pylsp-mypy", specifier = ">=0.6,<0.7" }, - { name = "pylsp-rope", specifier = ">=0.1,<0.2" }, - { name = "pytest", specifier = ">=7.1,<9.0" }, - { name = "pytest-asyncio", specifier = ">=0.20,<0.25" }, - { name = "pytest-cov", specifier = ">=4,<6" }, - { name = "python-lsp-server", specifier = ">=1.5,<2.0" }, - { name = "reuse", specifier = ">=4.0,<5.0" }, - { name = "ruff", specifier = ">=0.8.0" }, - { name = "sphinx", specifier = ">=8.0" }, - { name = "sphinx-rtd-theme", specifier = ">=3" }, - { name = "types-aiofiles", specifier = ">=23.1,<25.0" }, - { name = "types-tabulate", specifier = ">=0.9,<0.10" }, -] - [[package]] name = "idna" version = "3.10"