-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker_queue.py
96 lines (79 loc) · 2.85 KB
/
worker_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from time import sleep
from helper import get_mysql_session
from logger import logger
from tables import Tasks
from theodoretools.bot import feishu_text
from config import APP_URL
import copy
from sqlalchemy import update
from sqlalchemy.orm.session import Session
from task_cache import TaskCache
def check_status(db: Session, task_id: int):
task = db.query(Tasks).filter(Tasks.id == task_id).first()
target_requests = task.request_per_thread * task.threads
total_requested = task.request_succeed + task.request_failed
if task.request_failed == target_requests:
db.execute(
update(Tasks)
.where(Tasks.id == task_id)
.values(status=3, error_message="All requests failed")
)
db.commit()
if task.feishu_token:
feishu_text(
f"All requests failed task: {task.name}: {APP_URL}/?task_id={task.id}",
task.feishu_token,
)
return
if total_requested == target_requests:
db.execute(
update(Tasks).where(Tasks.id == task_id).values(status=4, error_message="")
)
db.commit()
if task.feishu_token:
feishu_text(
f"Task {task.name} succeed: {APP_URL}/?task_id={task.id}",
task.feishu_token,
)
return
if __name__ == "__main__":
db = get_mysql_session()
cache = TaskCache()
while True:
def persist_to_db(items):
if len(items) > 0:
for item in items:
db.add(copy.deepcopy(item))
db.commit()
logger.info(f"Persisted {len(items)} items to db")
try:
chunks = cache.chunk_dequeue(20)
persist_to_db(chunks)
logs = cache.log_dequeue(20)
persist_to_db(logs)
requests = cache.request_dequeue(10)
persist_to_db(requests)
for request in requests:
if request.success == 1:
db.execute(
update(Tasks)
.where(Tasks.id == request.task_id)
.values(request_succeed=Tasks.request_succeed + 1)
)
else:
db.execute(
update(Tasks)
.where(Tasks.id == request.task_id)
.values(request_failed=Tasks.request_failed + 1)
)
if len(requests) > 0:
db.commit()
check_status(db, requests[0].task_id)
if len(chunks) == 0 and len(logs) == 0 and len(requests) == 0:
sleep(1)
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
cache.reset()
db.rollback()
db.close()
db = get_mysql_session()