Skip to content

Commit

Permalink
node: Improve live progress output
Browse files Browse the repository at this point in the history
This upgrades the progress output quite a bit:

- There's a pretty animation at the start of the line!
- Instead of printing the last package interacted with (which made no
  sense), show as many of the currently-processing packages as can fit
  on a single line.
- Show live download progress for large downloads, so any that are
  holding up generation don't result in appears to be a hang.

In addition, this adds a hidden CLI flag, --traceback-on-interrupt, that
prints a full traceback from every active package coroutine on Ctrl-C,
which was useful while debugging this (and probably will be in the
future!)

Signed-off-by: Ryan Gonzalez <ryan.gonzalez@collabora.com>
  • Loading branch information
refi64 committed Aug 26, 2022
1 parent 6d0ad8f commit 8a618db
Show file tree
Hide file tree
Showing 5 changed files with 347 additions and 40 deletions.
203 changes: 203 additions & 0 deletions node/flatpak_node_generator/console.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
from typing import IO, Iterator, List, Optional

import contextlib
import enum
import io
import os
import queue
import sys
import threading
import time

_MOVE_UP = '\033[A'
_CLEAR_LINE = '\033[K'


class _ConsoleState(enum.Enum):
ACTIVE = enum.auto()
INACTIVE = enum.auto()
DISABLED = enum.auto()


_console_state = _ConsoleState.INACTIVE

_lines: List['AnimatedConsoleLine'] = []
_lines_lock = threading.Lock()

_print_queue: 'queue.Queue[str]' = queue.Queue()

_update_event = threading.Event()
_done_event = threading.Event()


def _get_next_frame_update_unlocked() -> Optional[float]:
if not _lines:
return None

return min(l._next_frame_update for l in _lines)


def _get_next_frame_update() -> Optional[float]:
with _lines_lock:
return _get_next_frame_update_unlocked()


def _flush_print_queue(*, output: IO[str]) -> None:
while True:
try:
content = _print_queue.get_nowait()
except queue.Empty:
break

output.write(content)


class _UpdateThread(threading.Thread):
def __init__(self, output: IO[str]) -> None:
super().__init__()
self.output = output

def run(self) -> None:
visible_lines = 0

while True:
_FRAME_DURATION = 0.05

next_frame_update = _get_next_frame_update()
if next_frame_update is not None:
now = time.monotonic()
if now < next_frame_update:
_update_event.wait(next_frame_update - now)
else:
_update_event.wait()

if _done_event.is_set():
_flush_print_queue(output=self.output)
self.output.flush()
break

output = io.StringIO()
output.write((_MOVE_UP + _CLEAR_LINE) * visible_lines)

now = time.monotonic()

_flush_print_queue(output=output)

with _lines_lock:
for line in _lines:
if line._next_frame_update <= now:
line._active_frame = (line._active_frame + 1) % len(
line._FRAMES
)
line._next_frame_update = now + _FRAME_DURATION

print(line._FRAMES[line._active_frame], line._content, file=output)

visible_lines = len(_lines)
next_frame_update = _get_next_frame_update_unlocked()

self.output.write(output.getvalue())
self.output.flush()

sleep_duration = 0.05
if next_frame_update is not None:
sleep_duration = min(sleep_duration, next_frame_update - now)

_done_event.wait(sleep_duration)


class _ForwardThread(threading.Thread):
def __init__(self, input: IO[str]) -> None:
super().__init__()
self.input = input

def run(self) -> None:
with self.input:
while True:
data = self.input.read(4096)
if not data:
break

_print_queue.put(data)
_update_event.set()


def disable_fancy_console() -> None:
global _console_state
_console_state = _ConsoleState.DISABLED


@contextlib.contextmanager
def activate_fancy_console() -> Iterator[None]:
global _console_state
assert _console_state != _ConsoleState.ACTIVE

with contextlib.ExitStack() as stack:
forward: Optional[_ForwardThread] = None

if _console_state != _ConsoleState.DISABLED and sys.stderr.isatty():
_UpdateThread(sys.stderr).start()

reader_fd, writer_fd = os.pipe()
reader = os.fdopen(reader_fd, 'r')
writer = os.fdopen(writer_fd, 'w')

stack.enter_context(writer)
stack.enter_context(contextlib.redirect_stderr(writer))

if sys.stdout.isatty():
# Just assume that, if both are a TTY, they're going to the *same* TTY.
stack.enter_context(contextlib.redirect_stdout(writer))

forward = _ForwardThread(reader)
forward.start()

_console_state = _ConsoleState.ACTIVE

try:
yield
finally:
if _console_state == _ConsoleState.ACTIVE:
_console_state = _ConsoleState.INACTIVE

stack.close()

_done_event.set()
_update_event.set()

if forward is not None:
forward.join()


class AnimatedConsoleLine:
_FRAMES = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏']

HEADING_WIDTH = 2

def __init__(self) -> None:
self._content = ''
self._next_frame_update = 0.0
self._active_frame = -1

@staticmethod
def claim() -> Optional['AnimatedConsoleLine']:
if _console_state != _ConsoleState.ACTIVE:
return None

with _lines_lock:
line = AnimatedConsoleLine()
_lines.append(line)
return line

def release(self) -> None:
with _lines_lock:
_lines.remove(self)
_update_event.set()

@property
def content(self) -> str:
return self._content

def update(self, content: str) -> None:
self._content = content
_update_event.set()
34 changes: 26 additions & 8 deletions node/flatpak_node_generator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import sys
import time

from flatpak_node_generator.console import activate_fancy_console, disable_fancy_console

from .cache import Cache, FilesystemBasedCache
from .manifest import ManifestGenerator
from .node_headers import NodeHeaders
Expand Down Expand Up @@ -53,6 +55,11 @@ async def _async_main() -> None:
action='append',
help='Given -r, restrict files to those matching the given pattern.',
)
parser.add_argument(
'--no-live-progress',
action='store_true',
help='Disable live progress output',
)
parser.add_argument(
'--registry',
help='The registry to use (npm only)',
Expand Down Expand Up @@ -136,13 +143,21 @@ async def _async_main() -> None:
dest='xdg_layout',
help="Don't use the XDG layout for caches",
)
# Internal option, useful for testing.
# Internal options, useful for testing.
parser.add_argument('--stub-requests', action='store_true', help=argparse.SUPPRESS)
parser.add_argument(
'--traceback-on-interrupt',
action='store_true',
help=argparse.SUPPRESS,
)

args = parser.parse_args()

Requests.retries = args.retries

if args.no_live_progress:
disable_fancy_console()

if args.type == 'yarn' and (args.no_devel or args.no_autopatch):
sys.exit('--no-devel and --no-autopatch do not apply to Yarn.')

Expand Down Expand Up @@ -223,13 +238,16 @@ async def _async_main() -> None:
)
special = SpecialSourceProvider(gen, options)

with provider_factory.create_module_provider(gen, special) as module_provider:
with GeneratorProgress(
packages,
module_provider,
args.max_parallel,
) as progress:
await progress.run()
with provider_factory.create_module_provider(
gen, special
) as module_provider, activate_fancy_console(), GeneratorProgress(
packages,
module_provider,
max_parallel=args.max_parallel,
traceback_on_interrupt=args.traceback_on_interrupt,
) as progress:
await progress.run()

for headers in rcfile_node_headers:
print(f'Generating headers {headers.runtime} @ {headers.target}')
await special.generate_node_headers(headers)
Expand Down
81 changes: 52 additions & 29 deletions node/flatpak_node_generator/progress.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from typing import Collection, ContextManager, Optional, Type
from typing import Collection, ContextManager, Optional, Set, Type

import asyncio
import shutil
import sys
import traceback
import types

from flatpak_node_generator.console import AnimatedConsoleLine

from .package import Package
from .providers import ModuleProvider

Expand All @@ -14,60 +17,80 @@ def __init__(
self,
packages: Collection[Package],
module_provider: ModuleProvider,
*,
max_parallel: int,
traceback_on_interrupt: bool,
) -> None:
self.finished = 0
self.processing: Set[Package] = set()
self.packages = packages
self.module_provider = module_provider
self.parallel_limit = asyncio.Semaphore(max_parallel)
self.previous_package: Optional[Package] = None
self.current_package: Optional[Package] = None
self.traceback_on_interrupt = traceback_on_interrupt
self.line = AnimatedConsoleLine.claim()

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
tb: Optional[types.TracebackType],
) -> None:
print()
if self.line is not None:
self.line.release()

def _format_package(self, package: Package, max_width: int) -> str:
result = f'{package.name} @ {package.version}'
def _update(self) -> None:
SEP = ', '
OMISSION_ELLIPSES = '...'

if len(result) > max_width:
result = result[: max_width - 3] + '...'
line = f'Generating packages [{self.finished}/{len(self.packages)}]'

return result
if self.line is None:
# No TTY.
print(line, file=sys.stderr)
return

def _update(self) -> None:
columns, _ = shutil.get_terminal_size()
line += ' => '

sys.stdout.write('\r' + ' ' * columns)
columns = shutil.get_terminal_size().columns - AnimatedConsoleLine.HEADING_WIDTH

prefix_string = f'\rGenerating packages [{self.finished}/{len(self.packages)}] '
sys.stdout.write(prefix_string)
max_package_width = columns - len(prefix_string)
packages = sorted(
f'{package.name} @ {package.version}' for package in self.processing
)
line += SEP.join(packages)

if self.current_package is not None:
sys.stdout.write(
self._format_package(self.current_package, max_package_width)
)
# Remove packages off the end until the line & ellipses fit.
while len(line) > columns:
last_sep = line.rfind(SEP)
if last_sep == -1:
break

sys.stdout.flush()
line = line[:last_sep]
if len(line) + len(SEP) + len(OMISSION_ELLIPSES) < columns:
line += SEP + OMISSION_ELLIPSES
break

def _update_with_package(self, package: Package) -> None:
self.previous_package, self.current_package = (
self.current_package,
package,
)
self._update()
self.line.update(line)

async def _generate(self, package: Package) -> None:
async with self.parallel_limit:
self._update_with_package(package)
await self.module_provider.generate_package(package)
self.processing.add(package)
# Don't bother printing an update here without live progress, since then the
# currently processing packages won't appear anyway.
if self.line is not None:
self._update()

try:
await self.module_provider.generate_package(package)
except asyncio.CancelledError:
if self.traceback_on_interrupt:
print(f'========== {package.name} ==========', file=sys.stderr)
traceback.print_exc()
print(file=sys.stderr)
raise

self.finished += 1
self._update_with_package(package)
self.processing.remove(package)
self._update()

async def run(self) -> None:
self._update()
Expand Down
Loading

0 comments on commit 8a618db

Please sign in to comment.