-
Notifications
You must be signed in to change notification settings - Fork 0
/
ingest.py
73 lines (56 loc) · 2.09 KB
/
ingest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import json
import pickle
import uuid
from datetime import datetime
from tinydb import TinyDB, where
# manage scans with Redis queue, we have the length of the q, enqueues and dequeues ops.
# Scan class manage the scan tasks instances (also create the uid - scan_id)
# Redis comments - https://redis.io/commands
db = TinyDB('./data/scans_db.json')
class IngestQueue(object):
def __init__(self, conn, name):
self.conn = conn
self.name = name
def get_length(self):
return self.conn.llen(self.name)
def by_id(self, id):
all_scan_tasks = self.conn.lrange(self.name, 0, -1)
res = list()
for serialized_task in all_scan_tasks:
task = pickle.loads(serialized_task)
if int(task.id) == int(id):
res.append(task)
return res[0]
def enqueue(self, scan_task):
db.update({"status": "RUNNING"}, where('id') == scan_task.id)
# serialized pickle way without saving file just for radis enq-deq
serialized_task = pickle.dumps(scan_task, protocol=pickle.HIGHEST_PROTOCOL)
self.conn.lpush(self.name, serialized_task)
return scan_task.id
def dequeue(self):
# pop from queue & serialized
_, serialized_task = self.conn.brpop(self.name)
task = pickle.loads(serialized_task)
updated_status_from_task = task.process_scan()
now = datetime.now()
ttl = now.strftime("%H:%M:%S")
db.update({"status": updated_status_from_task, "ttl": ttl}, where('id') == task.id)
return task
class Scan(object):
def __init__(self, func, *args, status=None, ttl=None):
self.id = str(uuid.uuid4())[:8]
self.func = func
self.args = args
self.status = status
self.ttl = ttl
def update_status(self, status):
self.status = status
return self
def update_ttl(self, ttl):
self.ttl = ttl
return self
def process_scan(self):
res = self.func(*self.args)
return res
def _to_json(self):
return json.loads(json.dumps(self, default=lambda o: o.__dict__))