Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added 'open_stream' to the Emitter (CRAFT-101). #13

Merged
merged 2 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions craft_cli/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import enum
import itertools
import math
import os
import pathlib
import queue
import select
import shutil
import sys
import threading
Expand Down Expand Up @@ -61,6 +63,9 @@ class _MessageInfo: # pylint: disable=too-many-instance-attributes
# seconds between each spinner char
_SPINNER_DELAY = 0.1

# the size of bytes chunk that the pipe reader will read at once
_PIPE_READER_CHUNK_SIZE = 4096
facundobatista marked this conversation as resolved.
Show resolved Hide resolved


def _get_terminal_width() -> int:
"""Return the number of columns of the terminal."""
Expand Down Expand Up @@ -388,6 +393,92 @@ def advance(self, amount: Union[int, float]) -> None:
self.printer.progress_bar(self.stream, self.text, self.accumulated, self.total)


class _PipeReaderThread(threading.Thread):
"""A thread that reads bytes from a pipe and write lines to the Printer."""

def __init__(self, pipe: int, printer: _Printer, stream: Optional[TextIO]):
super().__init__()

# the pipe to read from
self.read_pipe = pipe

# special flag used to stop the pipe reader thread
self.stop_flag = False

# where to collect the content that is being read but yeat not written (waiting for
# a newline)
self.remaining_content = b""

# printer and stream to write the assembled lines
self.printer = printer
self.stream = stream

def _write(self, data: bytes) -> None:
"""Convert the byte stream into unicode lines and send it to the printer."""
pointer = 0
data = self.remaining_content + data
while True:
# get the position of next newline (find starts in pointer position)
newline_position = data.find(b"\n", pointer)
facundobatista marked this conversation as resolved.
Show resolved Hide resolved

# no more newlines, store the rest of data for the next time and break
if newline_position == -1:
self.remaining_content = data[pointer:]
break

# get the useful line and update pointer for next cycle (plus one, to
# skip the new line itself)
useful_line = data[pointer:newline_position]
pointer = newline_position + 1

# write the useful line to intended outputs
unicode_line = useful_line.decode("utf8")
text = f":: {unicode_line}"
self.printer.show(self.stream, text, end_line=True, use_timestamp=True)

def run(self) -> None:
while True:
rlist, _, _ = select.select([self.read_pipe], [], [], 0.1)
if rlist:
data = os.read(self.read_pipe, _PIPE_READER_CHUNK_SIZE)
self._write(data)
elif self.stop_flag:
# only quit when nothing left to read
break

def stop(self) -> None:
"""Stop the thread.

This flag ourselves to quit, but then makes the main thread (which is the one calling
this method) to wait ourselves to finish.
"""
self.stop_flag = True
self.join()


class _StreamContextManager:
"""A context manager that provides a pipe for subprocess to write its output."""

def __init__(self, printer: _Printer, text: str, stream: Optional[TextIO]):
# open a pipe; subprocess will write in it, we will read from the other end
pipe_r, self.pipe_w = os.pipe()

# show the intended text (explicitly asking for a complete line) before passing the
# output command to the pip-reading thread
printer.show(stream, text, end_line=True, use_timestamp=True)

# enable the thread to read and show what comes through the pipe
self.pipe_reader = _PipeReaderThread(pipe_r, printer, stream)

def __enter__(self):
self.pipe_reader.start()
return self.pipe_w

def __exit__(self, *exc_info):
self.pipe_reader.stop()
return False # do not consume any exception


def _init_guard(wrapped_func):
"""Decorate Emitter methods to be called *after* init."""

Expand Down Expand Up @@ -514,6 +605,16 @@ def progress_bar(self, text: str, total: Union[int, float], delta: bool = True)
self.printer.show(stream, text, ephemeral=True) # type: ignore
return _Progresser(self.printer, total, text, stream, delta) # type: ignore

@_init_guard
def open_stream(self, text: str):
"""Open a stream context manager to get messages from subprocesses."""
# don't show third party streams if quiet or normal
if self.mode == EmitterMode.QUIET or self.mode == EmitterMode.NORMAL:
stream = None
else:
stream = sys.stderr
return _StreamContextManager(self.printer, text, stream) # type: ignore

@_init_guard
def ended_ok(self) -> None:
"""Finish the messaging system gracefully."""
Expand Down
46 changes: 46 additions & 0 deletions tests/unit/test_messages_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,52 @@ def test_progressbar_in_quiet_mode(get_initiated_emitter):
assert progresser.stream is None


@pytest.mark.parametrize(
"mode",
[
EmitterMode.QUIET,
EmitterMode.NORMAL,
],
)
def test_openstream_in_quietish_modes(get_initiated_emitter, mode):
"""Return a stream context manager with the output stream in None."""
emitter = get_initiated_emitter(mode)

with patch("craft_cli.messages._StreamContextManager") as stream_context_manager_mock:
instantiated_cm = object()
stream_context_manager_mock.return_value = instantiated_cm
context_manager = emitter.open_stream("some text")

assert emitter.printer_calls == []
assert context_manager is instantiated_cm
assert stream_context_manager_mock.mock_calls == [
call(emitter.printer, "some text", None),
]


@pytest.mark.parametrize(
"mode",
[
EmitterMode.VERBOSE,
EmitterMode.TRACE,
],
)
def test_openstream_in_verboseish_modes(get_initiated_emitter, mode):
"""Return a stream context manager with stderr as the output stream."""
emitter = get_initiated_emitter(mode)

with patch("craft_cli.messages._StreamContextManager") as stream_context_manager_mock:
instantiated_cm = object()
stream_context_manager_mock.return_value = instantiated_cm
context_manager = emitter.open_stream("some text")

assert emitter.printer_calls == []
assert context_manager is instantiated_cm
assert stream_context_manager_mock.mock_calls == [
call(emitter.printer, "some text", sys.stderr),
]


# -- tests for stopping the machinery


Expand Down
71 changes: 71 additions & 0 deletions tests/unit/test_messages_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

import logging
import re
import subprocess
import sys
import textwrap
from dataclasses import dataclass

import pytest
Expand Down Expand Up @@ -322,6 +325,74 @@ def test_04_5_trace_in_trace(capsys):
assert_outputs(capsys, emit, expected_err=expected, expected_log=expected)


@pytest.mark.parametrize(
"mode",
[
EmitterMode.QUIET,
EmitterMode.NORMAL,
],
)
def test_04_third_party_output_other_modes(capsys, tmp_path, mode):
"""Manage the streams produced for sub-executions, more quiet modes."""
# something to execute
script = tmp_path / "script.py"
script.write_text(
textwrap.dedent(
"""
import sys
print("foobar out", flush=True)
print("foobar err", file=sys.stderr, flush=True)
"""
)
)
emit = Emitter()
emit.init(mode, "testapp", GREETING)
with emit.open_stream("Testing stream") as stream:
subprocess.run([sys.executable, script], stdout=stream, stderr=stream, check=True)
emit.ended_ok()

expected = [
Line("Testing stream", timestamp=True),
Line(":: foobar out", timestamp=True),
Line(":: foobar err", timestamp=True),
]
assert_outputs(capsys, emit, expected_log=expected)


@pytest.mark.parametrize(
"mode",
[
EmitterMode.VERBOSE,
EmitterMode.TRACE,
],
)
def test_04_third_party_output_verbose(capsys, tmp_path, mode):
"""Manage the streams produced for sub-executions, debug and verbose mode."""
# something to execute
script = tmp_path / "script.py"
script.write_text(
textwrap.dedent(
"""
import sys
print("foobar out", flush=True)
print("foobar err", file=sys.stderr, flush=True)
"""
)
)
emit = Emitter()
emit.init(mode, "testapp", GREETING)
with emit.open_stream("Testing stream") as stream:
subprocess.run([sys.executable, script], stdout=stream, stderr=stream, check=True)
emit.ended_ok()

expected = [
Line("Testing stream", timestamp=True),
Line(":: foobar out", timestamp=True),
Line(":: foobar err", timestamp=True),
]
assert_outputs(capsys, emit, expected_err=expected, expected_log=expected)


@pytest.mark.parametrize(
"mode",
[
Expand Down
Loading