Skip to content

Commit

Permalink
fix: Exit properly and faster upon interruption
Browse files Browse the repository at this point in the history
- Fix: Deadlock when multiprocessing is enabled and a direct `SIGINT`
  is recieved or an unhandled exception is raised, in the main process.

  When a `SIGINT` was recieved by the main process but not its children
  (unlike hitting `Ctrl-C` in the terminal) or an unhandled exception
  was raised, the main process blocked on trying to join its children
  which would still be running.
  Recieving subsequent `SIGINT`(s) terminated the main process but left
  the children running.

- Add: `.logging_multi.multi_logger`.
- Add: `.__main__.MAIN_THREAD.
- Change: Replace `.cli.interrupted` with `.__main__.interrupted`.
- Change: Make all subprocesses and threads (except `LoadingIndicator`
  and `MultiLogger`) daemon so they don't hinder the main process from
  exiting, and die when the main process exits.
- Change: Subprocesses and non-main threads no longer check for main
  process interruption.
- Change: All log records and notifications from subprocesses and
  non-main threads are no longer emitted after the main process is
  interrupted.
- Change: No longer wait for `Opener`, `Getter-N` and `CheckManager`
  threads to exit normally upon main process interruption.
- Change: Subprocesses are no longer joined when the main process is
  interrupted (by `SIGINT` or an exception).
- Change: Join the `MultiLogger` thread instead of the log queue as
  items could now possibly be put into the queue after ending
  multi-process logging since the subprocesses are now daemon.
- Change: `.logging_multi.loq_queue`: `multiprocessing.JoinableQueue`
  -> `multiprocessing.Queue` since the the queue is no longer joined.
- Change: Update comments.

Note: Sending `SIGINT` to the subprocesses would yield undesired
results in the case of `Ctrl-C` being hit in the terminal as it'll
result in subprocesses receiving double `SIGINT` and possibly an
exception traceback being printed to STDERR by every subprocess.
  • Loading branch information
AnonymouX47 committed May 7, 2024
1 parent c3d972b commit b90ceef
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 82 deletions.
28 changes: 20 additions & 8 deletions src/termvisage/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging as _logging
import multiprocessing
import sys
from threading import Event
from threading import Thread, main_thread

from term_image.utils import write_tty

Expand All @@ -22,6 +22,8 @@ def main() -> int:

from . import cli, logging, notify, tui

global MAIN_THREAD, interrupted

def cleanup_temp_dir():
if not TEMP_DIR:
return
Expand All @@ -47,28 +49,31 @@ def finish_loading():

def finish_multi_logging():
if logging.initialized and logging.MULTI:
from .logging_multi import child_processes, log_queue
from .logging_multi import child_processes, log_queue, multi_logger

if not interrupted:
for process in child_processes:
process.join()

for process in child_processes:
process.join()
log_queue.put((None,) * 2) # End of logs
log_queue.join()
multi_logger.join()

# 1. `PIL.Image.open()` seems to cause forked child processes to block when called
# in both the parent and the child.
# 2. Unifies things across multiple platforms.
multiprocessing.set_start_method("spawn")

MAIN_THREAD = main_thread()

logger = _logging.getLogger("termvisage")
logger.setLevel(_logging.INFO)

cli.interrupted = Event()
try:
write_tty(b"\033[22;2t") # Save window title
write_tty(b"\033]2;TermVisage\033\\") # Set window title
exit_code = cli.main()
except KeyboardInterrupt:
cli.interrupted.set() # Signal interruption to subprocesses and other threads.
interrupted = True
finish_loading()
finish_multi_logging()
cleanup_temp_dir()
Expand All @@ -87,7 +92,7 @@ def finish_multi_logging():
raise
return INTERRUPTED
except Exception as e:
cli.interrupted.set() # Signal interruption to subprocesses and other threads.
interrupted = True
finish_loading()
finish_multi_logging()
cleanup_temp_dir()
Expand Down Expand Up @@ -121,5 +126,12 @@ def finish_multi_logging():
# Updated from `.cli.main()`.
TEMP_DIR: str | None = None

# The main thread of the main process.
# Set from `main()`.
MAIN_THREAD: Thread

# Process interruption flag.
interrupted: bool = False

if __name__ == "__main__":
sys.exit(main())
64 changes: 18 additions & 46 deletions src/termvisage/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from os.path import abspath, basename, exists, isdir, isfile, islink, realpath
from queue import Empty, Queue
from tempfile import mkdtemp
from threading import Event, current_thread
from threading import current_thread
from time import sleep
from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Union
from urllib.parse import urlparse
Expand Down Expand Up @@ -42,7 +42,7 @@
from .logging import Thread, init_log, log, log_exception
from .logging_multi import Process
from .tui.widgets import Image
from .utils import CSI, clear_queue
from .utils import CSI

try:
import fcntl # noqa: F401
Expand Down Expand Up @@ -160,8 +160,6 @@ def check_dir(
empty = True
content = {}
for entry in entries:
if interrupted and interrupted.is_set():
break
if not SHOW_HIDDEN and entry.name.startswith("."):
continue
try:
Expand Down Expand Up @@ -400,6 +398,7 @@ def process_result(
free_checkers,
globals_,
),
daemon=True,
)
for n in range(n_checkers)
]
Expand All @@ -418,8 +417,7 @@ def process_result(
setitem(checks_in_progress, *progress_queue.get())

while not (
interrupted.is_set() # MainThread has been interrupted
or not any(checks_in_progress) # All checkers are dead
not any(checks_in_progress) # All checkers are dead
# All checks are done
or (
# No check in progress
Expand Down Expand Up @@ -459,12 +457,6 @@ def process_result(

sleep(0.01) # Allow queue sizes to be updated
finally:
if interrupted.is_set():
clear_queue(dir_queue)
clear_queue(content_queue)
clear_queue(progress_queue)
return

if not any(checks_in_progress):
log(
"All checkers were terminated, checking directory sources failed!",
Expand All @@ -490,7 +482,7 @@ def process_result(
current_thread.name = "Checker"

_, links, source, _depth = dir_queue.get()
while not interrupted.is_set() and source:
while source:
log(f"Checking {source!r}", logger, verbose=True)
if islink(source):
links.append((source, realpath(source)))
Expand All @@ -504,13 +496,10 @@ def process_result(
source = abspath(source)
contents[source] = result
images.append((source, ...))
elif not interrupted.is_set() and result is None:
elif result is None:
log(f"{source!r} is empty", logger, verbose=True)
_, links, source, _depth = dir_queue.get()

if interrupted.is_set():
clear_queue(dir_queue)


def update_contents(
dir: str,
Expand Down Expand Up @@ -555,7 +544,7 @@ def get_urls(
) -> None:
"""Processes URL sources from a/some separate thread(s)"""
source = url_queue.get()
while not interrupted.is_set() and source:
while source:
log(f"Getting image from {source!r}", logger, verbose=True)
try:
images.append((basename(source), Image(ImageClass.from_url(source))))
Expand All @@ -572,17 +561,14 @@ def get_urls(
log(f"Done getting {source!r}", logger, verbose=True)
source = url_queue.get()

if interrupted.is_set():
clear_queue(url_queue)


def open_files(
file_queue: Queue,
images: List[Tuple[str, Image]],
ImageClass: type,
) -> None:
source = file_queue.get()
while not interrupted.is_set() and source:
while source:
log(f"Opening {source!r}", logger, verbose=True)
try:
images.append((source, Image(ImageClass.from_file(source))))
Expand All @@ -594,9 +580,6 @@ def open_files(
log_exception(f"Opening {source!r} failed", logger, direct=True)
source = file_queue.get()

if interrupted.is_set():
clear_queue(file_queue)


def check_arg(
name: str,
Expand Down Expand Up @@ -826,6 +809,7 @@ def main() -> None:
target=get_urls,
args=(url_queue, url_images, ImageClass),
name=f"Getter-{n}",
daemon=True,
)
for n in range(1, config_options.getters + 1)
]
Expand All @@ -836,6 +820,7 @@ def main() -> None:
target=open_files,
args=(file_queue, file_images, ImageClass),
name="Opener",
daemon=True,
)
opener_started = False

Expand All @@ -857,6 +842,7 @@ def main() -> None:
target=manage_checkers,
args=(n_checkers, dir_queue, contents, dir_images),
name="CheckManager",
daemon=True,
)
checkers_started = False

Expand Down Expand Up @@ -903,23 +889,13 @@ def main() -> None:
else:
dir_queue.put((None,) * 4)

interrupt = None
while True:
try:
if getters_started:
for getter in getters:
getter.join()
if opener_started:
opener.join()
if checkers_started:
check_manager.join()
break
except KeyboardInterrupt as e: # Ensure logs are in correct order
if not interrupt: # keep the first
interrupted.set() # Signal interruption to other threads
interrupt = e
if interrupt:
raise interrupt from None
if getters_started:
for getter in getters:
getter.join()
if opener_started:
opener.join()
if checkers_started:
check_manager.join()

notify.stop_loading()
notify.loading_interrupted.set()
Expand Down Expand Up @@ -1066,10 +1042,6 @@ def main() -> None:

logger = _logging.getLogger(__name__)

# Initially set from within `.__main__.main()`
# Will be updated from `.logging.init_log()` if multiprocessing is enabled
interrupted: Event | mp_Event | None = None

# Used by `check_dir()`
_depth: int

Expand Down
17 changes: 9 additions & 8 deletions src/termvisage/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
import sys
import warnings
from logging.handlers import RotatingFileHandler
from multiprocessing import Event as mp_Event
from threading import Event, Thread
from threading import Event, Thread, current_thread
from typing import Optional

from term_image.widget import UrwidImageScreen

from . import cli, notify
from . import __main__, cli, notify


def init_log(
Expand Down Expand Up @@ -89,16 +88,17 @@ def init_log(
MULTI = True

if MULTI:
from . import logging_multi
from .logging_multi import process_multi_logs

process_multi_logs.started = Event()
Thread(target=process_multi_logs, name="MultiLogger").start()
logging_multi.multi_logger = Thread(
target=process_multi_logs, name="MultiLogger"
)
logging_multi.multi_logger.start()
process_multi_logs.started.wait()
del process_multi_logs.started

# Inherited by instances of `.logging_multi.Process`
cli.interrupted = mp_Event()

initialized = True


Expand Down Expand Up @@ -183,7 +183,8 @@ def _log_warning(msg, catg, fname, lineno, f=None, line=None):

def _filter(record: logging.LogRecord) -> None:
return (
record.name.partition(".")[0] not in _disallowed_modules
(not __main__.interrupted or current_thread() is __main__.MAIN_THREAD)
and record.name.partition(".")[0] not in _disallowed_modules
# Workaround for urwid screen logs
and not record.name.startswith(_urwid_screen_logger_name)
)
Expand Down
25 changes: 10 additions & 15 deletions src/termvisage/logging_multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import logging as _logging
import os
from multiprocessing import JoinableQueue, Process
from multiprocessing import Process, Queue as mp_Queue
from traceback import format_exception

from term_image import (
Expand All @@ -25,7 +25,7 @@ def process_multi_logs() -> None:
global log_queue

PID = os.getpid()
log_queue = JoinableQueue()
log_queue = mp_Queue()
process_multi_logs.started.set() # See `.logging.init_log()`

record_type, record = log_queue.get()
Expand All @@ -35,9 +35,7 @@ def process_multi_logs() -> None:
_logger.handle(record)
else:
notify.notify(*record[0], **record[1])
log_queue.task_done()
record_type, record = log_queue.get()
log_queue.task_done()


class Process(Process):
Expand All @@ -62,7 +60,6 @@ def __init__(self, *args, redirect_notifs: bool = False, **kwargs):
"logging_level": _logging.getLogger().getEffectiveLevel(),
"redirect_notifs": redirect_notifs,
}
self._main_process_interrupted = cli.interrupted
self._tui_is_initialized = tui.initialized
if self._tui_is_initialized:
self._ImageClass = tui.main.ImageClass
Expand Down Expand Up @@ -100,14 +97,9 @@ def run(self):

super().run()
except KeyboardInterrupt:
# Log only if the main process was not interrupted
if not self._main_process_interrupted.wait(0.1):
logging.log(
"Interrupted" if logging.DEBUG else f"{self.name} was interrupted",
_logger,
_logging.ERROR,
direct=False,
)
_logger.error(
"Interrupted" if logging.DEBUG else f"{self.name} was interrupted"
)
except Exception:
logging.log_exception(
"Aborted" if logging.DEBUG else f"{self.name} was aborted", _logger
Expand Down Expand Up @@ -143,7 +135,7 @@ class RedirectHandler(_logging.Handler):
`handle()` method of a logger with a different handler.
"""

def __init__(self, log_queue: JoinableQueue, *args, **kwargs) -> None:
def __init__(self, log_queue: mp_Queue, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._log_queue = log_queue

Expand All @@ -168,4 +160,7 @@ def handle(self, record: _logging.LogRecord):
}

# Set from `process_multi_logs()` in the MultiLogger thread, only in the main process
log_queue: JoinableQueue
log_queue: mp_Queue

# Set from `.logging.init_log()`.
multi_logger: logging.Thread
Loading

0 comments on commit b90ceef

Please sign in to comment.