Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove trailing spaces where supported #210

Merged
merged 10 commits into from
Sep 12, 2024
89 changes: 66 additions & 23 deletions craft_cli/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import itertools
import math
import os
import platform
import queue
import shutil
import threading
Expand All @@ -44,6 +46,8 @@
# craft_cli/pytest_plugin.py )
TESTMODE = False

ANSI_CLEAR_LINE_TO_END = "\033[K" # ANSI escape code to clear the rest of the line.


@dataclass
class _MessageInfo:
Expand Down Expand Up @@ -71,7 +75,25 @@ def _get_terminal_width() -> int:
return shutil.get_terminal_size().columns


def _format_term_line(prefix: str, text: str, spintext: str, *, ephemeral: bool) -> str:
@lru_cache
def _supports_ansi_escape_sequences() -> bool:
"""Whether the current environment supports ANSI escape sequences."""
if platform.system() != "Windows":
return True
return "WT_SESSION" in os.environ # Windows Terminal supports ANSI escape sequences.


def _fill_line(text: str) -> str:
"""Turn the input text into a line that will fill the terminal."""
if _supports_ansi_escape_sequences():
return text + ANSI_CLEAR_LINE_TO_END
lengau marked this conversation as resolved.
Show resolved Hide resolved
width = _get_terminal_width()
# Fill the line but leave one character for the cursor.
n_spaces = width - len(text) % width - 1
return text + " " * n_spaces


def _format_term_line(previous_line_end: str, text: str, spintext: str, *, ephemeral: bool) -> str:
"""Format a line to print to the terminal."""
# fill with spaces until the very end, on one hand to clear a possible previous message,
# but also to always have the cursor at the very end
Expand All @@ -87,9 +109,8 @@ def _format_term_line(prefix: str, text: str, spintext: str, *, ephemeral: bool)
text = text[-remaining_for_last_line:]
if len(text) > usable:
text = text[: usable - 1] + "…"
cleaner = " " * (usable - len(text) % width)

return prefix + text + spintext + cleaner
return previous_line_end + _fill_line(text + spintext)


class _Spinner(threading.Thread):
Expand Down Expand Up @@ -226,38 +247,60 @@ def _get_prefixed_message_text(self, message: _MessageInfo) -> str:

return text

def _get_line_end(self, spintext: str) -> str:
"""Get the end of line to use when writing a line to the terminal."""
if spintext:
# forced to overwrite the previous message to present the spinner
return "\r"
if self.prv_msg is None or self.prv_msg.end_line:
# first message, or previous message completed the line: start clean
return ""
if self.prv_msg.ephemeral:
# the last one was ephemeral, overwrite it
return "\r"
# Previous line was ended; complete it.
return "\n"

def _write_line_terminal(self, message: _MessageInfo, *, spintext: str = "") -> None:
"""Write a simple line message to the screen."""
# prepare the text with (maybe) the timestamp
text = self._get_prefixed_message_text(message)
# prepare the text with (maybe) the timestamp and remove trailing spaces
text = self._get_prefixed_message_text(message).rstrip()

if message.use_timestamp:
timestamp_str = message.created_at.isoformat(sep=" ", timespec="milliseconds")
text = f"{timestamp_str} {text}"

if spintext:
# forced to overwrite the previous message to present the spinner
maybe_cr = "\r"
elif self.prv_msg is None or self.prv_msg.end_line:
# first message, or previous message completed the line: start clean
maybe_cr = ""
elif self.prv_msg.ephemeral:
# the last one was ephemeral, overwrite it
maybe_cr = "\r"
if self.prv_msg.stream != message.stream:
# If the last message's stream is different from this new one,
# send the carriage return to the original stream only.
print(maybe_cr, flush=True, file=self.prv_msg.stream, end="")
maybe_cr = ""
else:
# complete the previous line, leaving that message ok
maybe_cr = ""
previous_line_end = self._get_line_end(spintext)
if self.prv_msg and self.prv_msg.ephemeral and self.prv_msg.stream != message.stream:
# If the last message's stream is different from this new one,
# send a carriage return to the original stream only.
print("\r", flush=True, file=self.prv_msg.stream, end="")
previous_line_end = ""
if self.prv_msg and previous_line_end == "\n":
previous_line_end = ""
print(flush=True, file=self.prv_msg.stream)

# fill with spaces until the very end, on one hand to clear a possible previous message,
# but also to always have the cursor at the very end
width = _get_terminal_width()
usable = width - len(spintext) - 1 # the 1 is the cursor itself
if len(text) > usable:
if message.ephemeral:
text = text[: usable - 1] + "…"
elif spintext:
# we need to rewrite the message with the spintext, use only the last line for
# multiline messages, and ensure (again) that the last real line fits
remaining_for_last_line = len(text) % width
text = text[-remaining_for_last_line:]
if len(text) > usable:
text = text[: usable - 1] + "…"

# We don't need to rewrite the same ephemeral message repeatedly.
should_overwrite = spintext or message.end_line or not message.ephemeral
if should_overwrite or message != self.prv_msg:
line = _format_term_line(maybe_cr, text, spintext, ephemeral=message.ephemeral)
line = _format_term_line(
previous_line_end, text, spintext, ephemeral=message.ephemeral
)
print(line, end="", flush=True, file=message.stream)

if message.end_line:
Expand Down
21 changes: 12 additions & 9 deletions tests/integration/test_messages_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,24 @@ class Line:
regex: bool = False # if "text" is a regular expression instead of an exact string


def compare_lines(expected_lines: Collection[Line], raw_stream, std_stream):
def compare_lines(expected_lines: Collection[Line], raw_stream: str, std_stream):
"""Helper to compare expected lines to what was written to the terminal."""
width = printer._get_terminal_width()
terminal = printer._stream_is_terminal(std_stream)
if expected_lines:
assert len(raw_stream) > 0

if terminal:
# when showing to the terminal, it's completed always to screen width and terminated in
# different ways, so we split lines according to that length
assert (
len(raw_stream) % width == 0
), f"Bad length {len(raw_stream)} ({width=}) {raw_stream=!r}"
args = [iter(raw_stream)] * width
lines = ["".join(x) for x in zip(*args)] # pyright: ignore[reportGeneralTypeIssues]
if printer._supports_ansi_escape_sequences():
lines = raw_stream.replace("\033[K", "").splitlines(keepends=True)
else:
# If the terminal doesn't support ANSI escape sequences, we fill the screen
# width and don't terminate lines, so we split lines according to that length
assert (
len(raw_stream) % width == 0
), f"Bad length {len(raw_stream)} ({width=}) {raw_stream=!r}"
args = [iter(raw_stream)] * width
lines = ["".join(x) for x in zip(*args)] # pyright: ignore[reportGeneralTypeIssues]
else:
# when the output is captured, each line is simple and it should end in newline, so use
# that for splitting (but don't lose the newline)
Expand Down Expand Up @@ -1528,7 +1531,7 @@ def test_streaming_brief_spinner(capsys, logger, monkeypatch, init_emitter):
Line("Begin stage", permanent=False),
Line("Begin stage :: Opening stream", permanent=False),
Line("Begin stage :: Info message", permanent=False),
Line(r"Begin stage :: Info message - \(0.[7-9]s\)", permanent=False, regex=True),
Line(r"Begin stage :: Info message - \((0.[7-9]|1.0)s\)", permanent=False, regex=True),
Line("Begin stage :: Info message", permanent=False),
Line("Done stage", permanent=True),
]
Expand Down
Loading
Loading