Skip to content

Commit

Permalink
Make "complete()" wait for all enqueued messages (#231)
Browse files Browse the repository at this point in the history
  • Loading branch information
Delgan committed May 2, 2020
1 parent 1122e82 commit be597bd
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- Remove the possibility to modify the severity ``no`` of levels once they have been added in order to prevent surprising behavior (`#209 <https://github.com/Delgan/loguru/issues/209>`_).
- Add a new ``onerror`` optional argument to ``logger.catch()``, it should be a function which will be called when an exception occurs in order to customize error handling (`#224 <https://github.com/Delgan/loguru/issues/224>`_).
- Add a new ``exclude`` optional argument to ``logger.catch()``, is should be a type of exception to be purposefully ignored and propagated to the caller without being logged (`#248 <https://github.com/Delgan/loguru/issues/248>`_).
- Modify ``complete()`` to make it callable from non-asynchronous functions, it can thus be used if ``enqueue=True`` to make sure all messages have been processed (`#231 <https://github.com/Delgan/loguru/issues/231>`_).
- Fix possible deadlock while mixing ``threading`` with ``multiprocessing`` on Linux (`#231 <https://github.com/Delgan/loguru/issues/231>`_).
- Fix the filter function listing files for ``retention`` being too restrictive, it now matches files based on the pattern ``"basename(.*).ext(.*)"`` (`#229 <https://github.com/Delgan/loguru/issues/229>`_).
- Fix the impossibility to ``remove()`` a handler if an exception is raised while the sink' ``stop()`` function is called (`#237 <https://github.com/Delgan/loguru/issues/237>`_).
Expand Down
9 changes: 7 additions & 2 deletions docs/resources/recipes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ Code snippets and recipes for ``loguru``
.. |os.fork| replace:: :func:`os.fork`
.. |multiprocessing| replace:: :mod:`multiprocessing`
.. |traceback| replace:: :mod:`traceback`
.. |Thread| replace:: :class:`~threading.Thread`
.. |Process| replace:: :class:`~multiprocessing.Process`
.. |Pool| replace:: :class:`~multiprocessing.pool.Pool`
.. |Pool.map| replace:: :meth:`~multiprocessing.pool.Pool.map`
.. |Pool.apply| replace:: :meth:`~multiprocessing.pool.Pool.apply`
Expand All @@ -31,6 +33,7 @@ Code snippets and recipes for ``loguru``
.. |log| replace:: :meth:`~loguru._logger.Logger.log()`
.. |level| replace:: :meth:`~loguru._logger.Logger.level()`
.. |configure| replace:: :meth:`~loguru._logger.Logger.configure()`
.. |complete| replace:: :meth:`~loguru._logger.Logger.complete()`

.. _`unicode`: https://docs.python.org/3/howto/unicode.html

Expand Down Expand Up @@ -631,6 +634,7 @@ On Linux, thanks to |os.fork| there is no pitfall while using the ``logger`` ins

def my_process():
logger.info("Executing function in child process")
logger.complete()

if __name__ == "__main__":
logger.add("file.log", enqueue=True)
Expand All @@ -649,6 +653,7 @@ Things get a little more complicated on Windows. Indeed, this operating system d

def my_process(logger_):
logger_.info("Executing function in child process")
logger_.complete()

if __name__ == "__main__":
logger.remove() # Default "sys.stderr" sink is not picklable
Expand All @@ -660,7 +665,7 @@ Things get a little more complicated on Windows. Indeed, this operating system d

logger.info("Done")

Windows requires the added sinks to be picklable or otherwise will raise an error while creating the child process. Many stream objects like standard output and file descriptors are not picklable. In such case, the ``enqueue=True`` argument is required as it will allow the child process to only inherit the ``Queue`` where logs are sent.
Windows requires the added sinks to be picklable or otherwise will raise an error while creating the child process. Many stream objects like standard output and file descriptors are not picklable. In such case, the ``enqueue=True`` argument is required as it will allow the child process to only inherit the queue object where logs are sent.

The |multiprocessing| library is also commonly used to start a pool of workers using for example |Pool.map| or |Pool.apply|. Again, it will work flawlessly on Linux, but it will require some tinkering on Windows. You will probably not be able to pass the ``logger`` as an argument for your worker functions because it needs to be picklable, but altough handlers added using ``enqueue=True`` are "inheritable", they are not "picklable". Instead, you will need to make use of the ``initializer`` and ``initargs`` parameters while creating the |Pool| object in a way allowing your workers to access the shared ``logger``. You can either assign it to a class attribute or override the global logger of your child processes:

Expand Down Expand Up @@ -715,4 +720,4 @@ The |multiprocessing| library is also commonly used to start a pool of workers u
logger.info("Done")
Independently of the operating system, note that the process in which a handler is added with ``enqueue=True`` is in charge of the ``Queue`` internally used. This means that you should avoid to ``.remove()`` such handler from the parent process is any child is likely to continue using it.
Independently of the operating system, note that the process in which a handler is added with ``enqueue=True`` is in charge of the queue internally used. This means that you should avoid to ``.remove()`` such handler from the parent process is any child is likely to continue using it. More importantly, note that a |Thread| is started internally to consume the queue. Therefore, it is recommended to call |complete| before leaving |Process| to make sure the queue is left in a stable state.
5 changes: 4 additions & 1 deletion loguru/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
.. |Logger| replace:: :class:`~loguru._logger.Logger`
.. |catch| replace:: :meth:`~loguru._logger.Logger.catch()`
.. |contextualize| replace:: :meth:`~loguru._logger.Logger.contextualize()`
.. |complete| replace:: :meth:`~loguru._logger.Logger.complete()`
.. |bind| replace:: :meth:`~loguru._logger.Logger.bind()`
.. |patch| replace:: :meth:`~loguru._logger.Logger.patch()`
.. |opt| replace:: :meth:`~loguru._logger.Logger.opt()`
Expand Down Expand Up @@ -58,6 +59,7 @@ listed here and might be useful to type hint your code:
attributes).
- ``Catcher``: the context decorator returned by |catch|.
- ``Contextualizer``: the context decorator returned by |contextualize|.
- ``AwaitableCompleter``: the awaitable object returned by |complete|.
- ``RecordFile``: the ``record["file"]`` with ``name`` and ``path`` attributes.
- ``RecordLevel``: the ``record["level"]`` with ``name``, ``no`` and ``icon`` attributes.
- ``RecordThread``: the ``record["thread"]`` with ``id`` and ``name`` attributes.
Expand Down Expand Up @@ -116,6 +118,7 @@ class _GeneratorContextManager(ContextManager[_T], Generic[_T]):

Catcher = _GeneratorContextManager[None]
Contextualizer = _GeneratorContextManager[None]
AwaitableCompleter = Awaitable

class Level(NamedTuple):
name: str
Expand Down Expand Up @@ -255,7 +258,7 @@ class Logger:
**kwargs: Any
) -> int: ...
def remove(self, handler_id: Optional[int] = ...) -> None: ...
async def complete(self) -> None: ...
def complete(self) -> AwaitableCompleter: ...
@overload
def catch(
self,
Expand Down
26 changes: 15 additions & 11 deletions loguru/_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def __init__(
self._lock = create_handler_lock()
self._queue = None
self._confirmation_event = None
self._confirmation_lock = None
self._thread = None
self._stopped = False
self._owner_process = None
Expand All @@ -84,6 +85,7 @@ def __init__(
self._owner_process = multiprocessing.current_process()
self._queue = multiprocessing.SimpleQueue()
self._confirmation_event = multiprocessing.Event()
self._confirmation_lock = multiprocessing.Lock()
self._thread = Thread(
target=self._queued_writer, daemon=True, name="loguru-writer-%d" % self._id
)
Expand Down Expand Up @@ -189,18 +191,20 @@ def stop(self):

self._sink.stop()

async def complete(self):
with self._lock:
# If "enqueue=True", we need first to empty the queue and make sure all enqueued records
# are converted to tasks and correctly scheduled before awaiting them with "complete()"
# (otherwise they might be never awaited).
if self._enqueue:
if self._owner_process != multiprocessing.current_process():
return
self._queue.put(True)
self._confirmation_event.wait()
self._confirmation_event.clear()
def complete_queue(self):
if not self._enqueue:
return

with self._confirmation_lock:
self._queue.put(True)
self._confirmation_event.wait()
self._confirmation_event.clear()

async def complete_async(self):
if self._enqueue and self._owner_process != multiprocessing.current_process():
return

with self._lock:
await self._sink.complete()

def update_format(self, level_id):
Expand Down
49 changes: 38 additions & 11 deletions loguru/_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -1001,21 +1001,27 @@ def remove(self, handler_id=None):

handler.stop()

async def complete(self):
"""Wait for the end of the asynchronous tasks scheduled by coroutine handlers.
def complete(self):
"""Wait for the end of enqueued messages and asynchronous tasks scheduled by handlers.
This method should be awaited before the end of a coroutine executed by |asyncio.run| or
|loop.run_until_complete| to ensure all asynchronous logging messages are processed. The
function |asyncio.get_event_loop| is called beforehand, only tasks scheduled in the same
loop that the current one will be awaited by the method.
This method proceeds in two steps: first it waits for all logging messages added to handlers
with ``enqueue=True`` to be processed, then it returns an object that can be awaited to
finalize all logging tasks added to the event loop by coroutine sinks.
It only applies to coroutine functions added as sinks, awaiting this method does nothing if
there is no such sink attached to the logger.
It can be called from non-asynchronous code. This is especially recommended when the
``logger`` is utilized with ``multiprocessing`` to ensure messages put to the internal
queue have been properly transmitted before leaving a child process.
The returned object should be awaited before the end of a coroutine executed by
|asyncio.run| or |loop.run_until_complete| to ensure all asynchronous logging messages are
processed. The function |asyncio.get_event_loop| is called beforehand, only tasks scheduled
in the same loop that the current one will be awaited by the method.
Returns
-------
:term:`coroutine`
A coroutine which ensures all asynchronous logging calls are completed when awaited.
:term:`awaitable`
An awaitable object which ensures all asynchronous logging calls are completed when
awaited.
Examples
--------
Expand All @@ -1033,11 +1039,32 @@ async def complete(self):
>>> asyncio.run(work())
Start
End
>>> def process():
... logger.info("Message sent from the child")
... logger.complete()
...
>>> logger.add(sys.stderr, enqueue=True)
1
>>> process = multiprocessing.Process(target=process)
>>> process.start()
>>> process.join()
Message sent from the child
"""

with self._core.lock:
handlers = self._core.handlers.copy()
for handler in handlers.values():
await handler.complete()
handler.complete_queue()

class AwaitableCompleter:
def __await__(self_):
with self._core.lock:
handlers = self._core.handlers.copy()
for handler in handlers.values():
yield from handler.complete_async().__await__()

return AwaitableCompleter()

def catch(
self,
Expand Down
19 changes: 19 additions & 0 deletions tests/test_add_option_enqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pytest
import time
import re
import sys


class NotPicklable:
Expand Down Expand Up @@ -164,3 +165,21 @@ def test_not_caught_exception_sink_write(capsys):
assert out == "It's fine\n"
assert lines[0].startswith("Exception")
assert lines[-1] == "RuntimeError: You asked me to fail..."


def test_wait_for_all_messages_enqueued(capsys):
def slow_sink(message):
time.sleep(0.01)
sys.stderr.write(message)

logger.add(slow_sink, enqueue=True, catch=False, format="{message}")

for i in range(10):
logger.info(i)

logger.complete()

out, err = capsys.readouterr()

assert out == ""
assert err == "".join("%d\n" % i for i in range(10))
27 changes: 27 additions & 0 deletions tests/test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
import pytest
import loguru
import platform
from loguru import logger


Expand Down Expand Up @@ -582,3 +583,29 @@ def worker():
logger_.remove()

assert output.read() in ("Main\nChild\n", "Child\nMain\n")


@pytest.mark.skipif(os.name == "nt", reason="Windows does not support forking")
@pytest.mark.skipif(platform.python_implementation() == "PyPy", reason="PyPy is too slow")
def test_complete_from_multiple_child_processes(capsys):
logger.add(lambda _: None, enqueue=True, catch=False)

barrier = multiprocessing.Barrier(100)

def worker(barrier):
barrier.wait()
logger.complete()

processes = []

for _ in range(100):
process = multiprocessing.Process(target=worker, args=(barrier,))
process.start()
processes.append(process)

for process in processes:
process.join(2)
assert process.exitcode == 0

out, err = capsys.readouterr()
assert out == err == ""

0 comments on commit be597bd

Please sign in to comment.