-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpoller.py
80 lines (61 loc) · 1.88 KB
/
poller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import importers
import logging
import pkgutil
import storage
import time
import threading
import memory
from importers import download
logger = logging.getLogger(__name__)
POLL_SLEEP_INTERVAL_SECONDS = 600
POLL_SLEEP_INCREMENT_SECONDS = 60
workers = []
class Worker(threading.Thread):
def __init__(self, importer):
super(Worker, self).__init__()
self.importer = importer
self.running = True
self.index = len(workers)
workers.append(self)
def run(self):
delay = POLL_SLEEP_INCREMENT_SECONDS * self.index
logger.info("Staggered start %s for %d seconds" % (self.importer.__name__, delay))
time.sleep(delay)
while self.running:
self.sleep()
self.poll()
memory.check(memory.GB/2)
def sleep(self):
delay = POLL_SLEEP_INTERVAL_SECONDS
logger.info("Sleeping %s for %d seconds" % (self.importer.__name__, delay))
for n in range(delay):
time.sleep(1)
if not self.running:
break
def poll(self):
logging.info('Poll running=%s' % self.running)
if self.running:
logging.info('Polling importers')
try:
self.importer.poll()
logging.debug('Polling %s' % self.importer.__name__)
except Exception as e:
logging.error('POLLER: Error polling %s: %s' % (self.importer.__name__, e))
logging.info('Polling Storage')
storage.Storage.poll()
def stop(self):
self.running = False
workers = [
Worker(getattr(importers, name))
for _, name, _ in pkgutil.iter_modules(['importers'])
if hasattr(importers, name)
]
def poll():
for worker in workers:
worker.poll()
def start():
for worker in workers:
worker.start()
def stop():
for worker in workers:
worker.stop()