Skip to content

Commit

Permalink
WIP Progress Bar draft
Browse files Browse the repository at this point in the history
  • Loading branch information
rumpelsepp committed Dec 9, 2024
1 parent 4729eb0 commit 28c2beb
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 28 deletions.
1 change: 1 addition & 0 deletions src/gallia/command/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class BaseCommandConfig(GalliaBaseModel, cli_group="generic", config_section="ga
model_config = ConfigDict(arbitrary_types_allowed=True)

verbose: int = Field(0, description="increase verbosity on the console", short="v")
progress: bool = Field(True, description="Show a progress bar at the bottom of the terminal")
trace_log: bool = Field(False, description="set the loglevel of the logfile to TRACE")
pre_hook: str | None = Field(
None,
Expand Down
21 changes: 13 additions & 8 deletions src/gallia/commands/scan/uds/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)
from gallia.services.uds.core.exception import MalformedResponse, UDSException
from gallia.services.uds.core.utils import g_repr
from gallia.progress import progress_bar, ProgressSetter

logger = get_logger(__name__)

Expand Down Expand Up @@ -58,7 +59,7 @@ async def main(self) -> None:
found: dict[int, dict[int, Any]] = {}

if self.config.sessions is None:
found[0] = await self.perform_scan()
found[0] = await self.perform_scan_wrapper(session=None)
else:
sessions = [
s
Expand Down Expand Up @@ -89,7 +90,7 @@ async def main(self) -> None:

logger.result(f"scanning in session {g_repr(session)}")

found[session] = await self.perform_scan(session)
found[session] = await self.perform_scan_wrapper(session)

await self.ecu.leave_session(session, sleep=self.config.power_cycle_sleep)

Expand All @@ -102,13 +103,10 @@ async def main(self) -> None:
except Exception:
logger.result(f" [{g_repr(sid)}] vendor specific sid: {data}")

async def perform_scan(self, session: None | int = None) -> dict[int, Any]:
async def perform_scan(self, session: None | int, progress: ProgressSetter) -> dict[int, Any]:
result: dict[int, Any] = {}

# Starts at 0x00, see first loop iteration.
sid = -1
while sid < 0xFF:
sid += 1
for sid in range(0xFF):
if sid & 0x40 and (not self.config.scan_response_ids):
continue

Expand All @@ -118,6 +116,8 @@ async def perform_scan(self, session: None | int = None) -> dict[int, Any]:
logger.info(f"{g_repr(sid)}: skipped")
continue

progress(sid, 0xff, "scanning services")

if session is not None and self.config.check_session:
if not await self.ecu.check_and_set_session(session):
logger.error(
Expand All @@ -140,7 +140,7 @@ async def perform_scan(self, session: None | int = None) -> dict[int, Any]:
UDSErrorCodes.serviceNotSupported,
UDSErrorCodes.serviceNotSupportedInActiveSession,
]:
logger.info(f"{g_repr(sid)}: not supported [{resp}]")
logger.debug(f"{g_repr(sid)}: not supported [{resp}]")
break

if isinstance(resp, NegativeResponse) and resp.response_code in [
Expand All @@ -153,3 +153,8 @@ async def perform_scan(self, session: None | int = None) -> dict[int, Any]:
break

return result

async def perform_scan_wrapper(self, session: None | int) -> dict[int, Any]:
with progress_bar() as p:
return await self.perform_scan(session, p)

91 changes: 91 additions & 0 deletions src/gallia/progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# SPDX-FileCopyrightText: AISEC Pentesting Team
#
# SPDX-License-Identifier: Apache-2.0

# Adopted from: https://mdk.fr/blog/how-apt-does-its-fancy-progress-bar.html

import functools
import os
import shutil
from collections.abc import Callable, Iterator
from contextlib import contextmanager

eprint = functools.partial(print, end="", flush=True)


def save_cursor_position() -> None:
eprint("\0337")


def restore_cursor_position() -> None:
eprint("\0338")


def lock_footer() -> None:
_, lines = shutil.get_terminal_size()
eprint(f"\033[0;{lines-1}r")


def unlock_footer() -> None:
_, lines = shutil.get_terminal_size()
eprint(f"\033[0;{lines}r")


def move_to_footer() -> None:
_, lines = shutil.get_terminal_size()
eprint(f"\033[{lines};0f")


def move_cursor_up() -> None:
eprint("\033[1A")


def erase_line() -> None:
eprint("\033[2K")


def footer_init() -> None:
# Ensure the last line is available.
eprint("\n")
save_cursor_position()
lock_footer()
restore_cursor_position()
move_cursor_up()


def footer_deinit() -> None:
save_cursor_position()
unlock_footer()
move_to_footer()
erase_line()
restore_cursor_position()


type ProgressSetter = Callable[[int, int, str], None]


@contextmanager
def progress_bar(bar_len: int = 30) -> Iterator[ProgressSetter]:
def set_progress(count: int, total: int, suffix: str = "") -> None:
cols, _ = shutil.get_terminal_size()
filled_len = int(round(bar_len * count / float(total)))

percents = round(100.0 * count / float(total), 1)
bar = "█" * filled_len + "░" * (bar_len - filled_len)

content = f"{bar} {percents}% {suffix}"
if len(content) > cols:
content = content[: cols - 1] + "…"


save_cursor_position()
move_to_footer()
erase_line()
eprint(content)
restore_cursor_position()

footer_init()
try:
yield set_progress
finally:
footer_deinit()
36 changes: 16 additions & 20 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 28c2beb

Please sign in to comment.