-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.py
78 lines (62 loc) · 2.42 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import asyncio
import os
import signal
from scripts.rest import ServerManager
from scripts.zeebe import ZeebeWorkerManager
from dotenv import load_dotenv
class ApplicationManager:
def __init__(self, client_id, client_secret, cluster_id):
self.worker_manager = None
self.rest_server = None
self.client_id = client_id
self.client_secret = client_secret
self.cluster_id = cluster_id
async def shutdown(self):
# Shut down the server and the worker
if self.rest_server:
self.rest_server.shutdown()
if self.worker_manager:
self.worker_manager.shutdown()
def add_shutdown_listener(self):
loop = asyncio.get_event_loop()
def _shutdown_handler():
asyncio.create_task(self.shutdown())
for signal_name in {"SIGINT", "SIGTERM", "SIGBREAK"}:
if hasattr(signal, signal_name):
try:
loop.add_signal_handler(
getattr(signal, signal_name), _shutdown_handler
)
except NotImplementedError:
# Add signal handler may not be implemented on Windows
signal.signal(getattr(signal, signal_name), _shutdown_handler)
async def start(self):
self.add_shutdown_listener()
tasks = []
if os.getenv("ENABLE_WORKER") == "true":
self.worker_manager = ZeebeWorkerManager(
self.client_id, self.client_secret, self.cluster_id
)
worker_task = asyncio.create_task(self.worker_manager.start())
tasks.append(worker_task)
if os.getenv("ENABLE_REST") == "true":
self.rest_server = ServerManager(self.worker_manager)
server_task = asyncio.create_task(
self.rest_server.start(port=os.getenv("PORT"))
)
tasks.append(server_task)
await asyncio.gather(*tasks)
if __name__ == "__main__":
# Load environment variables from .env file
load_dotenv()
# Get environment variables
client_id = os.getenv("ZEEBE_CLIENT_ID")
client_secret = os.getenv("ZEEBE_CLIENT_SECRET")
cluster_id = os.getenv("CAMUNDA_CLUSTER_ID")
app_manager = ApplicationManager(client_id, client_secret, cluster_id)
try:
asyncio.run(app_manager.start())
except KeyboardInterrupt:
print("Keyboard interrupt")
finally:
print("Done")