-
Notifications
You must be signed in to change notification settings - Fork 3
/
schedulerbase.py
239 lines (178 loc) · 6.48 KB
/
schedulerbase.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import abc
import types
class AddWorkerUpdate():
def __init__(self, node_id=None, increment=1):
self.node_id = str(node_id)
self.increment = increment
def __str__(self):
return 'AddWorkerUpdate'
class SubmitTaskUpdate():
def __init__(self, task):
self.task = task
def __str__(self):
return 'SubmitTask({})'.format(self.task.id())
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.task == other.task
else:
return False
def __ne__(self, other):
return not self.__eq__(other)
class ForwardTaskUpdate():
def __init__(self, task, submitting_node_id, is_scheduled_locally):
self.task = task
self.submitting_node_id = str(submitting_node_id)
if type(is_scheduled_locally) != types.BooleanType:
raise ValueError("is scheduled locally must be boolean")
self.is_scheduled_locally = is_scheduled_locally
def __str__(self):
return 'ForwardTask({},{},{})'.format(self.task.id(), self.submitting_node_id, self.is_scheduled_locally)
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.task == other.task and self.submitting_node_id == other.submitting_node_id and self.is_scheduled_locally == other.is_scheduled_locally
else:
return False
def __ne__(self, other):
return not self.__eq__(other)
class ScheduleTaskUpdate():
def __init__(self, task, node_id):
self.task = task
self.node_id = node_id
def __str__(self):
return 'ScheduleTask({},{})'.format(self.task.id(), self.node_id)
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.task == other.task and self.node_id == other.node_id
else:
return False
def __ne__(self, other):
return not self.__eq__(other)
class FinishTaskUpdate():
def __init__(self, task_id):
self.task_id = str(task_id)
def __str__(self):
return 'FinishTask({})'.format(self.task_id)
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.task_id == other.task_id
else:
return False
def __ne__(self, other):
return not self.__eq__(other)
class ObjectReadyUpdate():
def __init__(self, object_description, submitting_node_id):
self.object_description = object_description
self.submitting_node_id = str(submitting_node_id)
def __str__(self):
return 'ObjectReadyUpdate({},{})'.format(str(self.object_description), self.submitting_node_id)
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.object_description == other.object_description and self.submitting_node_id == other.submitting_node_id
else:
return False
def __ne__(self, other):
return not self.__eq__(other)
class RegisterNodeUpdate():
def __init__(self, node_id, num_workers):
self.node_id = str(node_id)
self.num_workers = num_workers
def __str__(self):
return 'RegisterNode({},{})'.format(self.node_id, self.num_workers)
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.node_id == other.node_id and self.num_workers == other.num_workers
else:
return False
def __ne__(self, other):
return not self.__eq__(other)
class RemoveNodeUpdate():
def __init__(self, node_id):
self.node_id = str(node_id)
def __str__(self):
return 'RemoveNode({})'.format(self.node_id)
def __eq__(self, other):
if isinstance(other, self.__class__):
return self._node_id == other._node_id
else:
return False
def __ne__(self, other):
return not self.__eq__(other)
class ObjectStatus():
UNKNOWN = 0
EXPECTED = 1
READY = 2
class AbstractNodeRuntime():
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def send_to_dispatcher(self, task, priority):
"""Submit work to the worker execution engine. The implementation
must preserve FIFO sequence for tasks of equal priority.
Args:
task: id of the task to schedule
priority: lower numbers mean higher priority.
priorities must be integers
"""
pass
@abc.abstractmethod
def get_updates(self, update_handler):
"""Called by the local scheduler to register a handler for local
runtime events.
"""
pass
class AbstractSchedulerDatabase():
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def submit(self, task):
"""Submit a task to the scheduler
May be called by a driver or a worker program, either
directly or by proxy through the local scheduler.
Args:
task: Task object describing task to schedule
"""
pass
@abc.abstractmethod
def finished(self, task_id):
"""Report task completion to the scheduler
May be called by a worker program, either
directly or by proxy through the local scheduler.
Args:
task_id: id of the completed task
"""
pass
@abc.abstractmethod
def register_node(self, node_id, num_workers):
"""Report addition of a new node
Args:
node_id: id of the newly added node
num_workers: number of workers the node supports
"""
pass
@abc.abstractmethod
def remove_node(self, node_id):
"""Report removal of a node
Args:
node_id: id of the node being removed
"""
pass
@abc.abstractmethod
def get_global_scheduler_updates(self, update_handler):
"""Called by the global scheduler to register a handler for updates
Args:
update_hander: function for processing updates
"""
pass
@abc.abstractmethod
def get_local_scheduler_updates(self, node_id, update_handler):
"""Called by the local scheduler to register a handler for updates
Args:
update_hander: function for processing updates
"""
pass
@abc.abstractmethod
def schedule(self, node_id, task_id):
"""Instruct node to execute a task
Called by the scheduler.
Args:
task_id: id of the completed task
"""
pass