Skip to content

Commit

Permalink
fix: make repeated text keep the spinner
Browse files Browse the repository at this point in the history
Fixes #138
  • Loading branch information
lengau committed Dec 2, 2023
1 parent 256d8c2 commit b73121e
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 20 deletions.
56 changes: 36 additions & 20 deletions craft_cli/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class _MessageInfo:
bar_total: int | float | None = None
use_timestamp: bool = False
end_line: bool = False
created_at: datetime = field(default_factory=datetime.now)
created_at: datetime = field(default_factory=datetime.now, compare=False)
terminal_prefix: str = ""


Expand All @@ -71,6 +71,27 @@ def _get_terminal_width() -> int:
return shutil.get_terminal_size().columns


def _format_term_line(prefix: str, text: str, spintext: str, *, ephemeral: bool) -> str:
"""Format a line to print to the terminal."""
# fill with spaces until the very end, on one hand to clear a possible previous message,
# but also to always have the cursor at the very end
width = _get_terminal_width()
usable = width - len(spintext) - 1 # the 1 is the cursor itself
if len(text) > usable:
if ephemeral:
text = text[: usable - 1] + "…"
elif spintext:
# we need to rewrite the message with the spintext, use only the last line for
# multiline messages, and ensure (again) that the last real line fits
remaining_for_last_line = len(text) % width
text = text[-remaining_for_last_line:]
if len(text) > usable:
text = text[: usable - 1] + "…"
cleaner = " " * (usable - len(text) % width)

return prefix + text + spintext + cleaner


class _Spinner(threading.Thread):
"""A supervisor thread that will repeat long-standing messages with a spinner besides it.
Expand Down Expand Up @@ -105,6 +126,9 @@ def __init__(self, printer: Printer) -> None:
# a lock to wait the spinner to stop spinning
self.lock = threading.Lock()

# Keep the message under supervision available for examination.
self._under_supervision: _MessageInfo | None = None

def run(self) -> None:
prv_msg = None
t_init = time.time()
Expand Down Expand Up @@ -136,6 +160,11 @@ def run(self) -> None:

def supervise(self, message: _MessageInfo | None) -> None:
"""Supervise a message to spin it if it remains too long."""
# Don't bother the spinner if we're repeating the same message
if message == self._under_supervision:
return

self._under_supervision = message
self.queue.put(message)
# (maybe) wait for the spinner to exit spinning state (which does some cleaning)
self.lock.acquire()
Expand Down Expand Up @@ -200,7 +229,6 @@ def _get_prefixed_message_text(self, message: _MessageInfo) -> str:
def _write_line_terminal(self, message: _MessageInfo, *, spintext: str = "") -> None:
"""Write a simple line message to the screen."""
# prepare the text with (maybe) the timestamp

text = self._get_prefixed_message_text(message)

if message.use_timestamp:
Expand All @@ -226,24 +254,12 @@ def _write_line_terminal(self, message: _MessageInfo, *, spintext: str = "") ->
maybe_cr = ""
print(flush=True, file=self.prv_msg.stream)

# fill with spaces until the very end, on one hand to clear a possible previous message,
# but also to always have the cursor at the very end
width = _get_terminal_width()
usable = width - len(spintext) - 1 # the 1 is the cursor itself
if len(text) > usable:
if message.ephemeral:
text = text[: usable - 1] + "…"
elif spintext:
# we need to rewrite the message with the spintext, use only the last line for
# multiline messages, and ensure (again) that the last real line fits
remaining_for_last_line = len(text) % width
text = text[-remaining_for_last_line:]
if len(text) > usable:
text = text[: usable - 1] + "…"
cleaner = " " * (usable - len(text) % width)

line = maybe_cr + text + spintext + cleaner
print(line, end="", flush=True, file=message.stream)
# We don't need to rewrite the same ephemeral message repeatedly.
should_overwrite = spintext or message.end_line or not message.ephemeral
if should_overwrite or message != self.prv_msg:
line = _format_term_line(maybe_cr, text, spintext, ephemeral=message.ephemeral)
print(line, end="", flush=True, file=message.stream)

if message.end_line:
# finish the just shown line, as we need a clean terminal for some external thing
print(flush=True, file=message.stream)
Expand Down
27 changes: 27 additions & 0 deletions examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,33 @@ def _call_lib(logger, index):
time.sleep(2)


def example_30():
"""Message spamming, noting the different spinner behaviour"""
emit.progress(
"Message spamming example. The same message will be spammed for 10s, but "
"it will appear as one message with a spinner.",
permanent=True,
)
end_time = time.monotonic() + 10
while time.monotonic() < end_time:
emit.progress("SPAM SPAM SPAM SPAM")
time.sleep(0.001)
emit.progress(
"Now two separate messages will be spammed and no spinner appear.", permanent=True
)
end_time = time.monotonic() + 10
while time.monotonic() < end_time:
emit.progress("SPAM SPAM SPAM SPAM")
time.sleep(0.01)
emit.progress("SPAM SPAM SPAM baked beans")
time.sleep(0.01)
emit.progress("And back to the first message!", permanent=True)
end_time = time.monotonic() + 10
while time.monotonic() < end_time:
emit.progress("SPAM SPAM SPAM SPAM")
time.sleep(0.001)


# -- end of test cases

if len(sys.argv) < 2:
Expand Down
116 changes: 116 additions & 0 deletions tests/unit/test_printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,82 @@ def test_writelineterminal_spintext_length_just_exceeded(capsys, monkeypatch, lo
assert out == "\r0x1x2x3x4x… * 3.15s"


@pytest.mark.parametrize("test_text", ["", "Some test text."])
def test_writelineterminal_ephemeral_spam(capsys, monkeypatch, log_filepath, test_text):
"""Spam _write_line_terminal with the same message over and over."""
monkeypatch.setattr(printermod, "_get_terminal_width", lambda: 40)
printer = Printer(log_filepath)

for _ in range(10):
# Recreate the message here so we're checking equality, not just identity.
msg = _MessageInfo(sys.stdout, test_text, end_line=False, ephemeral=True)
printer._write_line_terminal(msg)
printer.prv_msg = msg

assert printer.unfinished_stream == sys.stdout

out, err = capsys.readouterr()
assert not err

# output completes the terminal width (leaving space for the cursor), and
# without a finishing newline
# There will only be one copy of the text.
assert out == test_text[:40] + " " * (39 - len(test_text))


@pytest.mark.parametrize(("ephemeral", "end_line"), [(False, False), (False, True), (True, True)])
@pytest.mark.parametrize("text", ["", "Some test text"])
def test_writelineterminal_rewrites_same_message(
capsys, monkeypatch, log_filepath, text, ephemeral, end_line
):
"""Spam _write_line_terminal with the same message and ensure it keeps writing."""
monkeypatch.setattr(printermod, "_get_terminal_width", lambda: 40)
printer = Printer(log_filepath)

for _ in range(10):
message = _MessageInfo(sys.stdout, text, ephemeral=ephemeral, end_line=end_line)
printer._write_line_terminal(message)
printer.prv_msg = message

out, err = capsys.readouterr()
assert not err

# output completes the terminal width (leaving space for the cursor), and
# without a finishing newline
assert out.strip() == "\n".join([text + " " * (39 - len(text))] * 10).strip()


@pytest.mark.parametrize("ephemeral", [True, False])
@pytest.mark.parametrize("text", ["", "Some test text"])
@pytest.mark.parametrize(
"spintext",
[
"!!!!!!!!!!",
"\\|/-",
"1234567890",
],
)
def test_writelineterminal_rewrites_same_message_with_spintext(
capsys, monkeypatch, log_filepath, text, spintext, ephemeral
):
"""Spam _write_line_terminal with the same message over and over."""
monkeypatch.setattr(printermod, "_get_terminal_width", lambda: 40)
printer = Printer(log_filepath)

for spin in spintext:
message = _MessageInfo(sys.stdout, text, ephemeral=ephemeral, end_line=False)
printer._write_line_terminal(message, spintext=spin)
printer.prv_msg = message

out, err = capsys.readouterr()
assert not err

# output completes the terminal width (leaving space for the cursor), and
# without a finishing newline
expected = "\r".join(text + s + " " * (39 - len(text) - len(s)) for s in spintext)
assert out.strip() == expected.strip()


# -- tests for the writing line (captured version) function


Expand Down Expand Up @@ -1055,6 +1131,46 @@ def test_spinner_working_simple(spinner, monkeypatch):
assert spinner.printer.spinned[-1] == (msg, " ")


def test_spinner_spam(spinner, monkeypatch):
"""Test that the spinner works properly when spamming the same message.
The expected behaviour is to ignore the existence of the fresh message and just
write when the spinner needs to update.
"""
# set absurdly low times so we can have several spin texts in the test
monkeypatch.setattr(printermod, "_SPINNER_THRESHOLD", 0.001)
monkeypatch.setattr(printermod, "_SPINNER_DELAY", 0.001)

# send a message, wait enough until we have enough spinned to test, and turn it off
msg = _MessageInfo(sys.stdout, "test msg")
for _ in range(100):
spinner.supervise(_MessageInfo(sys.stdout, "test msg"))
for _ in range(100):
if len(spinner.printer.spinned) >= 6:
break
time.sleep(0.01)
else:
pytest.fail("Waited too long for the _Spinner to generate messages")
spinner.supervise(None)
to_check = spinner.printer.spinned[:5]

# check the initial messages complete the "spinner drawing" also showing elapsed time
spinned_messages, spinned_texts = list(zip(*to_check))
assert all(spinned_msg == msg for spinned_msg in spinned_messages)
expected_texts = (
r" - \(\d\.\ds\)",
r" \\ \(\d\.\ds\)",
r" | \(\d\.\ds\)",
r" / \(\d\.\ds\)",
r" - \(\d\.\ds\)",
)
for expected, real in list(zip(expected_texts, spinned_texts)):
assert re.match(expected, real)

# the last message should clean the spinner
assert spinner.printer.spinned[-1] == (msg, " ")


def test_spinner_two_messages(spinner, monkeypatch):
"""Two consecutive messages with spinner."""
# set absurdly low times so we can have several spin texts in the test
Expand Down

0 comments on commit b73121e

Please sign in to comment.