-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduler.py
227 lines (185 loc) · 9.07 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
"""Scheduler for planning telemetry scrapes and frame processing and
methods for adding frame processing jobs to the scheduler.
Scraping, buffer processing and bucket processing can be parallelized.
Limitations:
- Duplicate tasks are not considered.
- Each satellite can have only 1 bucket processing job scheduled out of the following:
- raw_bucket_processing
- reprocess_entire_raw_bucket
- reprocess_failed_raw_bucket
"""
import datetime
from typing import Callable
from apscheduler.schedulers.base import STATE_STOPPED, STATE_PAUSED, STATE_RUNNING
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.events import EVENT_JOB_ADDED, EVENT_JOB_REMOVED, EVENT_JOB_EXECUTED, EVENT_JOB_SUBMITTED
from django.forms import ValidationError
from django_logger import logger
from transmission.processing.satellites import SATELLITES
from transmission.processing.process_raw_bucket import process_raw_bucket
from transmission.processing.telemetry_scraper import scrape
from transmission.processing.save_raw_data import process_uplink_and_downlink
def get_job_id(satellite: str, job_description: str) -> str:
"""Create an id, job description"""
if "bucket" in job_description:
return satellite + "_bucket_processing"
return satellite + "_" + job_description
def schedule_job(job_type: str, satellite: str = None, link: str = None,
date: datetime = None, interval: int = None) -> None:
"""Schedule job for a specified satellite and/or link.
Date will indicate the date and time when the task should run as datetime.
Interval represents the time interval in minutes for adding recurring tasks."""
scheduler = Scheduler()
scheduler.start_scheduler()
if job_type == "scraper" and satellite in SATELLITES:
args = [satellite]
job_id = get_job_id(satellite, job_type)
scheduler.add_job_to_schedule(scrape, args, job_id, date, interval)
elif job_type == "buffer_processing":
args = []
job_id = job_type
scheduler.add_job_to_schedule(process_uplink_and_downlink, args, job_id, date, interval)
elif job_type == "raw_bucket_processing" and satellite in SATELLITES:
args = [satellite, link]
job_id = get_job_id(satellite, job_type)
scheduler.add_job_to_schedule(process_raw_bucket, args, job_id, date, interval)
elif job_type == "reprocess_entire_raw_bucket" and satellite in SATELLITES:
args = [satellite, link, True, False]
job_id = get_job_id(satellite, job_type)
scheduler.add_job_to_schedule(process_raw_bucket, args, job_id, date, interval)
elif job_type == "reprocess_failed_raw_bucket" and satellite in SATELLITES:
args = [satellite, link, False, True]
job_id = get_job_id(satellite, job_type)
scheduler.add_job_to_schedule(process_raw_bucket, args, job_id, date, interval)
elif satellite not in SATELLITES or link not in ['uplink', 'downlink', None]:
raise ValidationError("Select a satellite and/or link!")
class Singleton(type):
"""Singleton class"""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class Scheduler(metaclass=Singleton):
"""Scheduler implementing the singleton design pattern, i.e.
multiple instantiations will point to the same object."""
# Scheduler workflow: add job -> remove job -> submit job -> execute job
# Task Triggers:
# date: use when you want to run the job just once at a certain point of time
# interval: use when you want to run the job at fixed intervals of time
# cron: use when you want to run the job periodically at certain time(s) of day
# Scheduler status can be running, paused, or shutdown.
# Running: jobs are scheduled and running.
# Paused: jobs' scheduling is paused.
# Shutdown: job stores are cleared. Tasks can also be killed with wait=False flag.
__instance = None
@staticmethod
def get_instance():
""" Returns class instance"""
return Scheduler.__instance
def __init__(self) -> None:
if Scheduler.__instance is not None:
logger.info("Scheduler already instantiated")
else:
executors = {
'default': ThreadPoolExecutor(1),
# 'processpool': ProcessPoolExecutor(0)
}
job_defaults = {
'coalesce': True,
'max_instances': 1
}
self.running_jobs = set()
self.pending_jobs = set()
self.scheduler = BackgroundScheduler(job_defaults=job_defaults, executors=executors)
self.scheduler.add_listener(self.submitted_job_listener, EVENT_JOB_SUBMITTED)
self.scheduler.add_listener(self.executed_job_listener, EVENT_JOB_EXECUTED)
self.scheduler.add_listener(self.add_job_listener, EVENT_JOB_ADDED)
self.scheduler.add_listener(self.remove_job_listener, EVENT_JOB_REMOVED)
Scheduler.__instance = self
def get_state(self) -> str:
"""Returns the state of the scheduler: running, paused, shutdown."""
if self.scheduler.state == STATE_STOPPED:
return "shutdown"
if self.scheduler.state == STATE_PAUSED:
return "paused"
if self.scheduler.state == STATE_RUNNING:
return "running"
return ""
def add_job_listener(self, event) -> None:
"""Listens to newly added jobs"""
logger.info("Scheduler added job: %s", event.job_id)
self.pending_jobs.add(event.job_id)
def remove_job_listener(self, event) -> None:
"""Listens to removed jobs"""
logger.info("Scheduler removed job: %s", event.job_id)
self.pending_jobs.remove(event.job_id)
def executed_job_listener(self, event) -> None:
"""Listens to executed jobs"""
logger.info("Scheduler executed job: %s", event.job_id)
self.running_jobs.remove(event.job_id)
# automated processing pipeline:
# - when a buffer processing task completes that will trigger the raw bucket processing
# - when a scraper task completes that will trigger the raw bucket processing
if "buffer_processing" in event.job_id:
for sat in SATELLITES:
schedule_job("raw_bucket_processing", sat)
elif "scraper" in event.job_id:
for sat in SATELLITES:
if sat in event.job_id:
schedule_job("raw_bucket_processing", sat, "downlink")
def submitted_job_listener(self, event) -> None:
"""Listens to submitted jobs"""
self.running_jobs.add(event.job_id)
logger.info("Scheduler submitted job: %s", event.job_id)
def get_pending_jobs(self) -> set:
"""Get the ids of the currently scheduled jobs."""
return self.pending_jobs
def get_running_jobs(self) -> set:
"""Get the ids of the currently running jobs."""
return self.running_jobs
# pylint:disable=R0913
def add_job_to_schedule(self, function: Callable, args: list, job_id: str,
date: datetime = None, interval: int = None) -> None:
"""Add a job to the schedule if not already scheduled."""
if interval is not None:
trigger = IntervalTrigger(minutes=interval, start_date=date)
else:
trigger = DateTrigger(run_date=date)
if job_id not in self.running_jobs and job_id not in self.pending_jobs:
self.scheduler.add_job(
function,
args=args,
id=job_id,
trigger=trigger,
)
elif job_id in self.pending_jobs:
self.scheduler.reschedule_job(job_id, trigger=trigger)
def start_scheduler(self) -> None:
"""Start the background scheduler"""
if self.scheduler.state == STATE_STOPPED:
logger.info("Scheduler started")
self.scheduler.start()
def pause_scheduler(self) -> None:
"""Pause the background scheduler"""
if self.scheduler.state == STATE_RUNNING:
logger.info("Scheduler paused")
self.scheduler.pause()
def resume_scheduler(self) -> None:
"""Resume the background scheduler"""
if self.scheduler.state == STATE_PAUSED:
logger.info("Scheduler resumed")
self.scheduler.resume()
def force_stop_scheduler(self) -> None:
"""Stop the scheduler. Running tasks will be killed before shutdown."""
if self.scheduler.state != STATE_STOPPED:
logger.info("Scheduler force shutdown")
self.scheduler.shutdown(wait=False)
def stop_scheduler(self) -> None:
"""Stop the scheduler. Running tasks will finish execution before shutdown."""
if self.scheduler.state != STATE_STOPPED:
logger.info("Scheduler shutdown")
self.scheduler.shutdown()