-
Notifications
You must be signed in to change notification settings - Fork 58
/
multiprocessing_logging.py
127 lines (97 loc) · 3.83 KB
/
multiprocessing_logging.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# vim : fileencoding=UTF-8 :
from __future__ import absolute_import, division, unicode_literals
import logging
import multiprocessing
import threading
try:
from queue import Empty
except ImportError: # Python 2.
from Queue import Empty # type: ignore[no-redef]
__version__ = "0.3.4"
def install_mp_handler(logger=None):
"""Wraps the handlers in the given Logger with an MultiProcessingHandler.
:param logger: whose handlers to wrap. By default, the root logger.
"""
if logger is None:
logger = logging.getLogger()
for i, orig_handler in enumerate(list(logger.handlers)):
handler = MultiProcessingHandler("mp-handler-{0}".format(i), sub_handler=orig_handler)
logger.removeHandler(orig_handler)
logger.addHandler(handler)
def uninstall_mp_handler(logger=None):
"""Unwraps the handlers in the given Logger from a MultiProcessingHandler wrapper
:param logger: whose handlers to unwrap. By default, the root logger.
"""
if logger is None:
logger = logging.getLogger()
for handler in list(logger.handlers):
if isinstance(handler, MultiProcessingHandler):
orig_handler = handler.sub_handler
logger.removeHandler(handler)
logger.addHandler(orig_handler)
class MultiProcessingHandler(logging.Handler):
def __init__(self, name, sub_handler=None):
super(MultiProcessingHandler, self).__init__()
if sub_handler is None:
sub_handler = logging.StreamHandler()
self.sub_handler = sub_handler
self.setLevel(self.sub_handler.level)
self.setFormatter(self.sub_handler.formatter)
self.filters = self.sub_handler.filters
self.queue = multiprocessing.Queue(-1)
self._is_closed = False
# The thread handles receiving records asynchronously.
self._receive_thread = threading.Thread(target=self._receive, name=name)
self._receive_thread.daemon = True
self._receive_thread.start()
def setFormatter(self, fmt):
super(MultiProcessingHandler, self).setFormatter(fmt)
self.sub_handler.setFormatter(fmt)
def _receive(self):
while True:
try:
if self._is_closed and self.queue.empty():
break
record = self.queue.get(timeout=0.2)
self.sub_handler.emit(record)
except (KeyboardInterrupt, SystemExit):
raise
except (EOFError, OSError):
break # The queue was closed by child?
except Empty:
pass # This periodically checks if the logger is closed.
except:
from sys import stderr
from traceback import print_exc
print_exc(file=stderr)
raise
self.queue.close()
self.queue.join_thread()
def _send(self, s):
self.queue.put_nowait(s)
def _format_record(self, record):
# ensure that exc_info and args
# have been stringified. Removes any chance of
# unpickleable things inside and possibly reduces
# message size sent over the pipe.
if record.args:
record.msg = record.msg % record.args
record.args = None
if record.exc_info:
self.format(record)
record.exc_info = None
return record
def emit(self, record):
try:
s = self._format_record(record)
self._send(s)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
def close(self):
if not self._is_closed:
self._is_closed = True
self._receive_thread.join(5.0) # Waits for receive queue to empty.
self.sub_handler.close()
super(MultiProcessingHandler, self).close()