-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathlogging.py
209 lines (158 loc) · 6.03 KB
/
logging.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
"""
Logging module.
"""
import inspect
import logging
from typing import Any
import psutil
from loguru import logger
from ch_backup.formatting import format_size
class Filter:
"""
Filter for luguru handler.
"""
def __init__(self, name):
self._name = name
def __call__(self, record):
"""
Filter callback to decide for each logged message whether it should be sent to the sink or not.
"""
return record["extra"].get("logger_name") == self._name
def make_filter(name):
"""
Factory for filter creation.
"""
return Filter(name)
def _create_formatter(fmt):
message_head = 800
message_tail = 300
def _format(record):
result_fmt = fmt
message_length = len(record["message"])
if message_length > message_head + message_tail:
tail_length = min(message_length - message_head, message_tail)
record["extra"]["message_tail"] = record["message"][-tail_length:]
record["message"] = record["message"][:message_head]
skipped_characters = message_length - message_head - tail_length
result_fmt += f" ...(skipped {skipped_characters} characters)... {{extra[message_tail]}}"
# Adding '\n{exception}' to dynamic formatters is required by loguru docs
return result_fmt + "\n{exception}"
return _format
class InterceptHandler(logging.Handler):
"""
Helper class for logging interception.
"""
def emit(self, record: logging.LogRecord) -> None:
"""
Intercept all records from the logging module and redirect them into loguru.
The handler for loguru will be chosen based on module name.
"""
# Get corresponding Loguru level if it exists.
level: int or str # type: ignore
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
# Find caller from where originated the logged message.
frame, depth = inspect.currentframe(), 0
while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__):
frame = frame.f_back
depth += 1
logger.bind(logger_name=record.name).opt(
depth=depth, exception=record.exc_info
).log(level, record.getMessage())
def configure(config_loguru: dict) -> None:
"""
Configure logger.
"""
# Configure loguru.
loguru_handlers = []
for name, value in config_loguru["handlers"].items():
handler = {
"sink": value["sink"],
"format": _create_formatter(config_loguru["formatters"][value["format"]]),
"enqueue": True,
"diagnose": False,
}
if "level" in value:
handler["level"] = value["level"]
if "filter" in value:
handler["filter"] = value["filter"]
# https://loguru.readthedocs.io/en/stable/api/logger.html#message
# One can also pass a dict mapping module names to minimum required level. In such case, each log record will search for it’s closest parent in the dict
# and use the associated level as the filter.
# In order to set a default level, the "" module name should be used as it is the parent of all modules (it does not suppress global level threshold, though).
handler["filter"][""] = False
else:
handler["filter"] = make_filter(name)
loguru_handlers.append(handler)
logger.configure(
handlers=loguru_handlers, # type: ignore
activation=[("", True)],
)
# Configure logging.
logging.basicConfig(handlers=[InterceptHandler()], level=0)
def critical(msg, *args, **kwargs):
"""
Log a message with severity 'CRITICAL'.
"""
with_exception = kwargs.get("exc_info", False)
getLogger("ch-backup").opt(exception=with_exception).critical(msg, *args, **kwargs)
def error(msg, *args, **kwargs):
"""
Log a message with severity 'ERROR'.
"""
with_exception = kwargs.get("exc_info", False)
getLogger("ch-backup").opt(exception=with_exception).error(msg, *args, **kwargs)
def exception(msg, *args, **kwargs):
"""
Log a message with severity 'ERROR' with exception information.
"""
with_exception = kwargs.get("exc_info", True)
getLogger("ch-backup").opt(exception=with_exception).debug(msg, *args, **kwargs)
def warning(msg, *args, **kwargs):
"""
Log a message with severity 'WARNING'.
"""
with_exception = kwargs.get("exc_info", False)
getLogger("ch-backup").opt(exception=with_exception).warning(msg, *args, **kwargs)
def info(msg, *args, **kwargs):
"""
Log a message with severity 'INFO'.
"""
with_exception = kwargs.get("exc_info", False)
getLogger("ch-backup").opt(exception=with_exception).info(msg, *args, **kwargs)
def debug(msg, *args, **kwargs):
"""
Log a message with severity 'DEBUG'.
"""
with_exception = kwargs.get("exc_info", False)
getLogger("ch-backup").opt(exception=with_exception).debug(msg, *args, **kwargs)
def memory_usage():
"""
Log memory usage information.
It's assumed that a big amount of memory is shared across main and worker processes. So shared memory is accounted
only for main process.
"""
try:
main_proc = psutil.Process()
main_proc_usage = main_proc.memory_info().rss
worker_procs_usage = 0
for proc in main_proc.children():
memory_info = proc.memory_info()
worker_procs_usage += memory_info.rss - memory_info.shared
total_usage = main_proc_usage + worker_procs_usage
debug(
"Memory usage: {} (main process: {}, worker processes: {})",
format_size(total_usage),
format_size(main_proc_usage),
format_size(worker_procs_usage),
)
except Exception:
warning("Unable to get memory usage", exc_info=True)
# pylint: disable=invalid-name
def getLogger(name: str) -> Any:
"""
Get logger with specific name.
"""
return logger.bind(logger_name=name)