-
Notifications
You must be signed in to change notification settings - Fork 502
/
worker.py
141 lines (119 loc) · 4.36 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import os
import threading
from time import sleep, time
from sentry_sdk._queue import Queue, FullError
from sentry_sdk.utils import logger
from sentry_sdk.consts import DEFAULT_QUEUE_SIZE
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any
from typing import Optional
from typing import Callable
_TERMINATOR = object()
class BackgroundWorker:
def __init__(self, queue_size=DEFAULT_QUEUE_SIZE):
# type: (int) -> None
self._queue = Queue(queue_size) # type: Queue
self._lock = threading.Lock()
self._thread = None # type: Optional[threading.Thread]
self._thread_for_pid = None # type: Optional[int]
@property
def is_alive(self):
# type: () -> bool
if self._thread_for_pid != os.getpid():
return False
if not self._thread:
return False
return self._thread.is_alive()
def _ensure_thread(self):
# type: () -> None
if not self.is_alive:
self.start()
def _timed_queue_join(self, timeout):
# type: (float) -> bool
deadline = time() + timeout
queue = self._queue
queue.all_tasks_done.acquire()
try:
while queue.unfinished_tasks:
delay = deadline - time()
if delay <= 0:
return False
queue.all_tasks_done.wait(timeout=delay)
return True
finally:
queue.all_tasks_done.release()
def start(self):
# type: () -> None
with self._lock:
if not self.is_alive:
self._thread = threading.Thread(
target=self._target, name="sentry-sdk.BackgroundWorker"
)
self._thread.daemon = True
try:
self._thread.start()
self._thread_for_pid = os.getpid()
except RuntimeError:
# At this point we can no longer start because the interpreter
# is already shutting down. Sadly at this point we can no longer
# send out events.
self._thread = None
def kill(self):
# type: () -> None
"""
Kill worker thread. Returns immediately. Not useful for
waiting on shutdown for events, use `flush` for that.
"""
logger.debug("background worker got kill request")
with self._lock:
if self._thread:
try:
self._queue.put_nowait(_TERMINATOR)
except FullError:
logger.debug("background worker queue full, kill failed")
self._thread = None
self._thread_for_pid = None
def flush(self, timeout, callback=None):
# type: (float, Optional[Any]) -> None
logger.debug("background worker got flush request")
with self._lock:
if self.is_alive and timeout > 0.0:
self._wait_flush(timeout, callback)
logger.debug("background worker flushed")
def full(self):
# type: () -> bool
return self._queue.full()
def _wait_flush(self, timeout, callback):
# type: (float, Optional[Any]) -> None
initial_timeout = min(0.1, timeout)
if not self._timed_queue_join(initial_timeout):
pending = self._queue.qsize() + 1
logger.debug("%d event(s) pending on flush", pending)
if callback is not None:
callback(pending, timeout)
if not self._timed_queue_join(timeout - initial_timeout):
pending = self._queue.qsize() + 1
logger.error("flush timed out, dropped %s events", pending)
def submit(self, callback):
# type: (Callable[[], None]) -> bool
self._ensure_thread()
try:
self._queue.put_nowait(callback)
return True
except FullError:
return False
def _target(self):
# type: () -> None
while True:
callback = self._queue.get()
try:
if callback is _TERMINATOR:
break
try:
callback()
except Exception:
logger.error("Failed processing job", exc_info=True)
finally:
self._queue.task_done()
sleep(0)