Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid deadlock caused by generational GC and object finalizers #720

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions loguru/_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import json
import multiprocessing
import os
import threading
from contextlib import contextmanager
from threading import Thread

from ._colorizer import Colorizer
Expand Down Expand Up @@ -64,6 +66,7 @@ def __init__(

self._stopped = False
self._lock = create_handler_lock()
self._lock_acquired = threading.local()
self._queue = None
self._confirmation_event = None
self._confirmation_lock = None
Expand Down Expand Up @@ -95,6 +98,24 @@ def __init__(
def __repr__(self):
return "(id=%d, level=%d, sink=%s)" % (self._id, self._levelno, self._name)

@contextmanager
def _protected_lock(self):
"""Acquire the lock, but fail fast if its already acquired by the current thread."""
if getattr(self._lock_acquired, "acquired", False):
raise RuntimeError(
"Could not acquire internal lock because it was already in use (deadlock avoided). "
"This likely happened because the logger was re-used inside a sink, a signal "
"handler or a '__del__' method. This is not permitted because the logger and its "
"handlers are not re-entrant."
)
self._lock_acquired.acquired = True
try:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would replace this block with:

self._lock_acquired.acquired = True
with self._lock:
    yield
self._lock_acquired.acquired = False

Is theer any reason to use try / finally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. If an exception is raised, then the lock would be released, but the selc._lock_acquired.acquired would still be True.

Try this fake example:

from contextlib import contextmanager

@contextmanager
def fake_lock():
    try:
        print("lock acquired")
        yield
    finally:
        print("lock release")


@contextmanager
def fake_protected_lock():
    print("lock state set to True")
    with fake_lock():
        yield
    print("lock state set to False")


with fake_protected_lock():
    raise ValueError

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I don't know why I assumed it was automatically handled, I'm probably too used to pytest fixtures. The try / finally is required as explained in the docs I shared myself... 🤦‍♂️

Thanks for the example. 👍

self._lock.acquire()
yield
finally:
self._lock.release()
self._lock_acquired.acquired = False

def emit(self, record, level_id, from_decorator, is_raw, colored_message):
try:
if self._levelno > record["level"].no:
Expand Down Expand Up @@ -168,21 +189,20 @@ def emit(self, record, level_id, from_decorator, is_raw, colored_message):
str_record = Message(formatted)
str_record.record = record

with self._lock:
with self._protected_lock():
if self._stopped:
return
if self._enqueue:
self._queue.put(str_record)
else:
self._sink.write(str_record)

except Exception:
if not self._error_interceptor.should_catch():
raise
self._error_interceptor.print(record)

def stop(self):
with self._lock:
with self._protected_lock():
self._stopped = True
if self._enqueue:
if self._owner_process_pid != os.getpid():
Expand All @@ -207,7 +227,7 @@ async def complete_async(self):
if self._enqueue and self._owner_process_pid != os.getpid():
return

with self._lock:
with self._protected_lock():
await self._sink.complete()

def update_format(self, level_id):
Expand Down Expand Up @@ -297,6 +317,7 @@ def _queued_writer(self):
def __getstate__(self):
state = self.__dict__.copy()
state["_lock"] = None
state["_lock_acquired"] = None
Copy link
Owner

@Delgan Delgan Oct 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good catch, I assumed threading.locals() was cleaned when cloned in a new process, but it's not the case actually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was well was covered in tests 💪

state["_memoize_dynamic_format"] = None
if self._enqueue:
state["_sink"] = None
Expand All @@ -307,6 +328,7 @@ def __getstate__(self):
def __setstate__(self, state):
self.__dict__.update(state)
self._lock = create_handler_lock()
self._lock_acquired = threading.local()
if self._is_formatter_dynamic:
if self._colorize:
self._memoize_dynamic_format = memoize(prepare_colored_format)
Expand Down
69 changes: 69 additions & 0 deletions tests/test_locks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import gc

import pytest

from loguru import logger


class CyclicReference:
"""A minimal cyclic reference.

Cyclical references are garbage collected using the generational collector rather than
via reference counting. This is important here, because the generational collector runs
periodically, meaning that it is hard to predict when the stack will be overtaken by a
garbage collection process - but it will almost always be when allocating memory of some
kind.

When this object is garbage-collected, a log will be emitted.
"""

def __init__(self, _other: "CyclicReference" = None):
self.other = _other or CyclicReference(_other=self)

def __del__(self):
logger.info("tearing down")


def perform_full_gc():
for generation in range(3):
gc.collect(generation=generation)


@pytest.fixture()
def _remove_cyclic_references():
"""Prevent cyclic isolate finalizers bleeding into other tests."""
try:
yield
finally:
perform_full_gc()


def test_no_deadlock_on_generational_garbage_collection(_remove_cyclic_references):
"""Regression test for https://github.com/Delgan/loguru/issues/712

Assert that deadlocks do not occur when a cyclic isolate containing log output in
finalizers is collected by generational GC, during the output of another log message.
"""

# GIVEN a sink which assigns some memory
output = []

def sink(message):
# The generational GC could be triggered here by any memory assignment, but we
# trigger it explicitly to avoid a flaky test.
# See https://github.com/Delgan/loguru/issues/712
perform_full_gc()

# Actually write the message somewhere
output.append(message)

logger.add(sink, colorize=False)

# WHEN there are cyclic isolates in memory which log on GC
# AND logs are produced long enough to trigger generational GC
for _ in range(1000):
CyclicReference()
logger.info("test")

# THEN deadlock should not be reached
assert True