Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove volatile-info in favor of a bottom progress bar (cf. apt install) #643

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/gallia/cli/gallia.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ def __call__(
if setup_log:
setup_logging(
level=get_log_level(config.verbose),
no_volatile_info=not config.volatile_info,
)

sys.exit(get_command(config).entry_point())
Expand Down
4 changes: 1 addition & 3 deletions src/gallia/command/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ class BaseCommandConfig(GalliaBaseModel, cli_group="generic", config_section="ga
model_config = ConfigDict(arbitrary_types_allowed=True)

verbose: int = Field(0, description="increase verbosity on the console", short="v")
volatile_info: bool = Field(
True, description="Overwrite log lines with level info or lower in terminal output"
)
progress: bool = Field(True, description="Show a progress bar at the bottom of the terminal")
trace_log: bool = Field(False, description="set the loglevel of the logfile to TRACE")
pre_hook: str | None = Field(
None,
Expand Down
20 changes: 12 additions & 8 deletions src/gallia/commands/scan/uds/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from gallia.command.config import Field, Ranges, Ranges2D
from gallia.command.uds import UDSScannerConfig
from gallia.log import get_logger
from gallia.progress import ProgressSetter, progress_bar
from gallia.services.uds import (
NegativeResponse,
UDSErrorCodes,
Expand Down Expand Up @@ -58,7 +59,7 @@ async def main(self) -> None:
found: dict[int, dict[int, Any]] = {}

if self.config.sessions is None:
found[0] = await self.perform_scan()
found[0] = await self.perform_scan_wrapper(session=None)
else:
sessions = [
s
Expand Down Expand Up @@ -89,7 +90,7 @@ async def main(self) -> None:

logger.result(f"scanning in session {g_repr(session)}")

found[session] = await self.perform_scan(session)
found[session] = await self.perform_scan_wrapper(session)

await self.ecu.leave_session(session, sleep=self.config.power_cycle_sleep)

Expand All @@ -102,13 +103,10 @@ async def main(self) -> None:
except Exception:
logger.result(f" [{g_repr(sid)}] vendor specific sid: {data}")

async def perform_scan(self, session: None | int = None) -> dict[int, Any]:
async def perform_scan(self, session: None | int, progress: ProgressSetter) -> dict[int, Any]:
result: dict[int, Any] = {}

# Starts at 0x00, see first loop iteration.
sid = -1
while sid < 0xFF:
sid += 1
for sid in range(0xFF):
if sid & 0x40 and (not self.config.scan_response_ids):
continue

Expand All @@ -118,6 +116,8 @@ async def perform_scan(self, session: None | int = None) -> dict[int, Any]:
logger.info(f"{g_repr(sid)}: skipped")
continue

progress(sid, 0xFF, "scanning services")

if session is not None and self.config.check_session:
if not await self.ecu.check_and_set_session(session):
logger.error(
Expand All @@ -140,7 +140,7 @@ async def perform_scan(self, session: None | int = None) -> dict[int, Any]:
UDSErrorCodes.serviceNotSupported,
UDSErrorCodes.serviceNotSupportedInActiveSession,
]:
logger.info(f"{g_repr(sid)}: not supported [{resp}]")
logger.debug(f"{g_repr(sid)}: not supported [{resp}]")
break

if isinstance(resp, NegativeResponse) and resp.response_code in [
Expand All @@ -153,3 +153,7 @@ async def perform_scan(self, session: None | int = None) -> dict[int, Any]:
break

return result

async def perform_scan_wrapper(self, session: None | int) -> dict[int, Any]:
with progress_bar() as p:
return await self.perform_scan(session, p)
30 changes: 5 additions & 25 deletions src/gallia/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ def to_level(self) -> Loglevel:
def setup_logging(
level: Loglevel | None = None,
color_mode: ColorMode = ColorMode.AUTO,
no_volatile_info: bool = False,
logger_name: str = "gallia",
) -> None:
"""Enable and configure gallia's logging system.
Expand Down Expand Up @@ -270,13 +269,12 @@ def setup_logging(
logger.handlers[0].close()
logger.removeHandler(logger.handlers[0])
colored = resolve_color_mode(color_mode)
add_stderr_log_handler(logger_name, level, no_volatile_info, colored)
add_stderr_log_handler(logger_name, level, colored)


def add_stderr_log_handler(
logger_name: str,
level: Loglevel,
no_volatile_info: bool,
colored: bool,
) -> None:
queue: Queue[Any] = Queue()
Expand All @@ -288,9 +286,6 @@ def add_stderr_log_handler(
console_formatter = _ConsoleFormatter()

console_formatter.colored = colored
stderr_handler.terminator = "" # We manually handle the terminator while formatting
if no_volatile_info is False:
console_formatter.volatile_info = True

stderr_handler.setFormatter(console_formatter)

Expand Down Expand Up @@ -346,9 +341,9 @@ class _PenlogRecordV2:
_PenlogRecord: TypeAlias = _PenlogRecordV2


def _colorize_msg(data: str, levelno: int) -> tuple[str, int]:
def _colorize_msg(data: str, levelno: int) -> str:
if sys.platform == "win32" or not sys.stderr.isatty():
return data, 0
return data

out = ""
match levelno:
Expand All @@ -373,7 +368,7 @@ def _colorize_msg(data: str, levelno: int) -> tuple[str, int]:
out += data
out += _Color.RESET.value

return out, len(style)
return out


def _format_record( # noqa: PLR0913
Expand All @@ -384,12 +379,8 @@ def _format_record( # noqa: PLR0913
tags: list[str] | None,
stacktrace: str | None,
colored: bool = False,
volatile_info: bool = False,
) -> str:
msg = ""
if volatile_info:
msg += "\33[2K"
extra_len = 4
msg += dt.strftime("%b %d %H:%M:%S.%f")[:-3]
msg += " "
msg += name
Expand All @@ -398,20 +389,10 @@ def _format_record( # noqa: PLR0913
msg += ": "

if colored:
tmp_msg, extra_len_tmp = _colorize_msg(data, levelno)
msg += tmp_msg
extra_len += extra_len_tmp
msg += _colorize_msg(data, levelno)
else:
msg += data

if volatile_info and levelno <= Loglevel.INFO:
terminal_width, _ = shutil.get_terminal_size()
msg = msg[: terminal_width + extra_len - 1] # Adapt length to invisible ANSI colors
msg += _Color.RESET.value
msg += "\r"
else:
msg += "\n"

if stacktrace is not None:
msg += "\n"
msg += stacktrace
Expand Down Expand Up @@ -732,7 +713,6 @@ def format(
tags=record.__dict__["tags"] if "tags" in record.__dict__ else None,
stacktrace=stacktrace,
colored=self.colored,
volatile_info=self.volatile_info,
)


Expand Down
89 changes: 89 additions & 0 deletions src/gallia/progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# SPDX-FileCopyrightText: AISEC Pentesting Team
#
# SPDX-License-Identifier: Apache-2.0

# Adopted from: https://mdk.fr/blog/how-apt-does-its-fancy-progress-bar.html

import functools
import shutil
from collections.abc import Callable, Iterator
from contextlib import contextmanager

eprint = functools.partial(print, end="", flush=True)


def save_cursor_position() -> None:
eprint("\0337")


def restore_cursor_position() -> None:
eprint("\0338")


def lock_footer() -> None:
_, lines = shutil.get_terminal_size()
eprint(f"\033[0;{lines-1}r")


def unlock_footer() -> None:
_, lines = shutil.get_terminal_size()
eprint(f"\033[0;{lines}r")


def move_to_footer() -> None:
_, lines = shutil.get_terminal_size()
eprint(f"\033[{lines};0f")


def move_cursor_up() -> None:
eprint("\033[1A")


def erase_line() -> None:
eprint("\033[2K")


def footer_init() -> None:
# Ensure the last line is available.
eprint("\n")
save_cursor_position()
lock_footer()
restore_cursor_position()
move_cursor_up()


def footer_deinit() -> None:
save_cursor_position()
unlock_footer()
move_to_footer()
erase_line()
restore_cursor_position()


type ProgressSetter = Callable[[int, int, str], None]


@contextmanager
def progress_bar(bar_len: int = 30) -> Iterator[ProgressSetter]:
def set_progress(count: int, total: int, suffix: str = "") -> None:
cols, _ = shutil.get_terminal_size()
filled_len = int(round(bar_len * count / float(total)))

percents = round(100.0 * count / float(total), 1)
bar = "█" * filled_len + "░" * (bar_len - filled_len)

content = f"{bar} {percents}% {suffix}"
if len(content) > cols:
content = content[: cols - 1] + "…"

save_cursor_position()
move_to_footer()
erase_line()
eprint(content)
restore_cursor_position()

footer_init()
try:
yield set_progress
finally:
footer_deinit()
1 change: 0 additions & 1 deletion tests/bats/helpers.bash
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ common_setup() {
setup_gallia_toml() {
{
echo "[gallia]"
echo "no-volatile-info = true"
echo "verbosity = 1"

echo "[gallia.scanner]"
Expand Down
1 change: 0 additions & 1 deletion tests/bats/run_bats.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
set -eu

gallia script vecu rng "unix-lines:///tmp/vecu.sock" \
--no-volatile-info \
--seed 3 \
--mandatory-sessions 1 2 3 \
--mandatory-services DiagnosticSessionControl EcuReset ReadDataByIdentifier WriteDataByIdentifier RoutineControl SecurityAccess ReadMemoryByAddress WriteMemoryByAddress RequestDownload RequestUpload TesterPresent ReadDTCInformation ClearDiagnosticInformation InputOutputControlByIdentifier 2>vecu.log &
Expand Down
Loading