diff --git a/src/gallia/cli/gallia.py b/src/gallia/cli/gallia.py index 90f764a51..4a5a20cb2 100644 --- a/src/gallia/cli/gallia.py +++ b/src/gallia/cli/gallia.py @@ -185,7 +185,6 @@ def __call__( if setup_log: setup_logging( level=get_log_level(config.verbose), - no_volatile_info=not config.volatile_info, ) sys.exit(get_command(config).entry_point()) diff --git a/src/gallia/command/base.py b/src/gallia/command/base.py index 8731fbe66..40d475e09 100644 --- a/src/gallia/command/base.py +++ b/src/gallia/command/base.py @@ -115,9 +115,7 @@ class BaseCommandConfig(GalliaBaseModel, cli_group="generic", config_section="ga model_config = ConfigDict(arbitrary_types_allowed=True) verbose: int = Field(0, description="increase verbosity on the console", short="v") - volatile_info: bool = Field( - True, description="Overwrite log lines with level info or lower in terminal output" - ) + progress: bool = Field(True, description="Show a progress bar at the bottom of the terminal") trace_log: bool = Field(False, description="set the loglevel of the logfile to TRACE") pre_hook: str | None = Field( None, diff --git a/src/gallia/commands/scan/uds/services.py b/src/gallia/commands/scan/uds/services.py index 40d503fe5..a0031794a 100644 --- a/src/gallia/commands/scan/uds/services.py +++ b/src/gallia/commands/scan/uds/services.py @@ -9,6 +9,7 @@ from gallia.command.config import Field, Ranges, Ranges2D from gallia.command.uds import UDSScannerConfig from gallia.log import get_logger +from gallia.progress import ProgressSetter, progress_bar from gallia.services.uds import ( NegativeResponse, UDSErrorCodes, @@ -58,7 +59,7 @@ async def main(self) -> None: found: dict[int, dict[int, Any]] = {} if self.config.sessions is None: - found[0] = await self.perform_scan() + found[0] = await self.perform_scan_wrapper(session=None) else: sessions = [ s @@ -89,7 +90,7 @@ async def main(self) -> None: logger.result(f"scanning in session {g_repr(session)}") - found[session] = await self.perform_scan(session) + found[session] = await self.perform_scan_wrapper(session) await self.ecu.leave_session(session, sleep=self.config.power_cycle_sleep) @@ -102,13 +103,10 @@ async def main(self) -> None: except Exception: logger.result(f" [{g_repr(sid)}] vendor specific sid: {data}") - async def perform_scan(self, session: None | int = None) -> dict[int, Any]: + async def perform_scan(self, session: None | int, progress: ProgressSetter) -> dict[int, Any]: result: dict[int, Any] = {} - # Starts at 0x00, see first loop iteration. - sid = -1 - while sid < 0xFF: - sid += 1 + for sid in range(0xFF): if sid & 0x40 and (not self.config.scan_response_ids): continue @@ -118,6 +116,8 @@ async def perform_scan(self, session: None | int = None) -> dict[int, Any]: logger.info(f"{g_repr(sid)}: skipped") continue + progress(sid, 0xFF, "scanning services") + if session is not None and self.config.check_session: if not await self.ecu.check_and_set_session(session): logger.error( @@ -140,7 +140,7 @@ async def perform_scan(self, session: None | int = None) -> dict[int, Any]: UDSErrorCodes.serviceNotSupported, UDSErrorCodes.serviceNotSupportedInActiveSession, ]: - logger.info(f"{g_repr(sid)}: not supported [{resp}]") + logger.debug(f"{g_repr(sid)}: not supported [{resp}]") break if isinstance(resp, NegativeResponse) and resp.response_code in [ @@ -153,3 +153,7 @@ async def perform_scan(self, session: None | int = None) -> dict[int, Any]: break return result + + async def perform_scan_wrapper(self, session: None | int) -> dict[int, Any]: + with progress_bar() as p: + return await self.perform_scan(session, p) diff --git a/src/gallia/log.py b/src/gallia/log.py index fdcbdf96f..855108057 100644 --- a/src/gallia/log.py +++ b/src/gallia/log.py @@ -231,7 +231,6 @@ def to_level(self) -> Loglevel: def setup_logging( level: Loglevel | None = None, color_mode: ColorMode = ColorMode.AUTO, - no_volatile_info: bool = False, logger_name: str = "gallia", ) -> None: """Enable and configure gallia's logging system. @@ -270,13 +269,12 @@ def setup_logging( logger.handlers[0].close() logger.removeHandler(logger.handlers[0]) colored = resolve_color_mode(color_mode) - add_stderr_log_handler(logger_name, level, no_volatile_info, colored) + add_stderr_log_handler(logger_name, level, colored) def add_stderr_log_handler( logger_name: str, level: Loglevel, - no_volatile_info: bool, colored: bool, ) -> None: queue: Queue[Any] = Queue() @@ -288,9 +286,6 @@ def add_stderr_log_handler( console_formatter = _ConsoleFormatter() console_formatter.colored = colored - stderr_handler.terminator = "" # We manually handle the terminator while formatting - if no_volatile_info is False: - console_formatter.volatile_info = True stderr_handler.setFormatter(console_formatter) @@ -346,9 +341,9 @@ class _PenlogRecordV2: _PenlogRecord: TypeAlias = _PenlogRecordV2 -def _colorize_msg(data: str, levelno: int) -> tuple[str, int]: +def _colorize_msg(data: str, levelno: int) -> str: if sys.platform == "win32" or not sys.stderr.isatty(): - return data, 0 + return data out = "" match levelno: @@ -373,7 +368,7 @@ def _colorize_msg(data: str, levelno: int) -> tuple[str, int]: out += data out += _Color.RESET.value - return out, len(style) + return out def _format_record( # noqa: PLR0913 @@ -384,12 +379,8 @@ def _format_record( # noqa: PLR0913 tags: list[str] | None, stacktrace: str | None, colored: bool = False, - volatile_info: bool = False, ) -> str: msg = "" - if volatile_info: - msg += "\33[2K" - extra_len = 4 msg += dt.strftime("%b %d %H:%M:%S.%f")[:-3] msg += " " msg += name @@ -398,20 +389,10 @@ def _format_record( # noqa: PLR0913 msg += ": " if colored: - tmp_msg, extra_len_tmp = _colorize_msg(data, levelno) - msg += tmp_msg - extra_len += extra_len_tmp + msg += _colorize_msg(data, levelno) else: msg += data - if volatile_info and levelno <= Loglevel.INFO: - terminal_width, _ = shutil.get_terminal_size() - msg = msg[: terminal_width + extra_len - 1] # Adapt length to invisible ANSI colors - msg += _Color.RESET.value - msg += "\r" - else: - msg += "\n" - if stacktrace is not None: msg += "\n" msg += stacktrace @@ -732,7 +713,6 @@ def format( tags=record.__dict__["tags"] if "tags" in record.__dict__ else None, stacktrace=stacktrace, colored=self.colored, - volatile_info=self.volatile_info, ) diff --git a/src/gallia/progress.py b/src/gallia/progress.py new file mode 100644 index 000000000..784915aee --- /dev/null +++ b/src/gallia/progress.py @@ -0,0 +1,89 @@ +# SPDX-FileCopyrightText: AISEC Pentesting Team +# +# SPDX-License-Identifier: Apache-2.0 + +# Adopted from: https://mdk.fr/blog/how-apt-does-its-fancy-progress-bar.html + +import functools +import shutil +from collections.abc import Callable, Iterator +from contextlib import contextmanager + +eprint = functools.partial(print, end="", flush=True) + + +def save_cursor_position() -> None: + eprint("\0337") + + +def restore_cursor_position() -> None: + eprint("\0338") + + +def lock_footer() -> None: + _, lines = shutil.get_terminal_size() + eprint(f"\033[0;{lines-1}r") + + +def unlock_footer() -> None: + _, lines = shutil.get_terminal_size() + eprint(f"\033[0;{lines}r") + + +def move_to_footer() -> None: + _, lines = shutil.get_terminal_size() + eprint(f"\033[{lines};0f") + + +def move_cursor_up() -> None: + eprint("\033[1A") + + +def erase_line() -> None: + eprint("\033[2K") + + +def footer_init() -> None: + # Ensure the last line is available. + eprint("\n") + save_cursor_position() + lock_footer() + restore_cursor_position() + move_cursor_up() + + +def footer_deinit() -> None: + save_cursor_position() + unlock_footer() + move_to_footer() + erase_line() + restore_cursor_position() + + +type ProgressSetter = Callable[[int, int, str], None] + + +@contextmanager +def progress_bar(bar_len: int = 30) -> Iterator[ProgressSetter]: + def set_progress(count: int, total: int, suffix: str = "") -> None: + cols, _ = shutil.get_terminal_size() + filled_len = int(round(bar_len * count / float(total))) + + percents = round(100.0 * count / float(total), 1) + bar = "█" * filled_len + "░" * (bar_len - filled_len) + + content = f"{bar} {percents}% {suffix}" + if len(content) > cols: + content = content[: cols - 1] + "…" + + save_cursor_position() + move_to_footer() + erase_line() + eprint(content) + restore_cursor_position() + + footer_init() + try: + yield set_progress + finally: + footer_deinit() diff --git a/tests/bats/helpers.bash b/tests/bats/helpers.bash index 3b4f37201..67d9498ed 100644 --- a/tests/bats/helpers.bash +++ b/tests/bats/helpers.bash @@ -13,7 +13,6 @@ common_setup() { setup_gallia_toml() { { echo "[gallia]" - echo "no-volatile-info = true" echo "verbosity = 1" echo "[gallia.scanner]" diff --git a/tests/bats/run_bats.sh b/tests/bats/run_bats.sh index 5a9548d75..9eb818b0c 100755 --- a/tests/bats/run_bats.sh +++ b/tests/bats/run_bats.sh @@ -7,7 +7,6 @@ set -eu gallia script vecu rng "unix-lines:///tmp/vecu.sock" \ - --no-volatile-info \ --seed 3 \ --mandatory-sessions 1 2 3 \ --mandatory-services DiagnosticSessionControl EcuReset ReadDataByIdentifier WriteDataByIdentifier RoutineControl SecurityAccess ReadMemoryByAddress WriteMemoryByAddress RequestDownload RequestUpload TesterPresent ReadDTCInformation ClearDiagnosticInformation InputOutputControlByIdentifier 2>vecu.log &