-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanager.py
320 lines (252 loc) · 8.46 KB
/
manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
from __future__ import annotations
import asyncio
from signal import SIGINT, default_int_handler, signal
import pytest
from creart import it
from launart import Launart
from launart.service import Service
from tests.fixture import EmptyService, component, service
@pytest.mark.asyncio
async def test_nothing():
mgr = Launart()
lc = EmptyService()
mgr.add_component(lc)
await mgr.launch()
assert lc.triggered
def test_nothing_blocking():
mgr = Launart()
lc = EmptyService()
mgr.add_component(lc)
mgr.launch_blocking()
assert lc.triggered
# set SIGINT and do again
lc.triggered = False
loop = it(asyncio.AbstractEventLoop)
tsk = loop.create_task(asyncio.sleep(2.0))
tsk2 = loop.create_task(asyncio.sleep(0))
signal(SIGINT, lambda *_: None)
mgr.launch_blocking()
assert tsk.cancelled()
assert tsk.done()
assert tsk2.done()
assert lc.triggered
signal(SIGINT, default_int_handler)
def test_nothing_complex():
mgr = Launart()
class _L(Service):
id = "empty"
triggered = False
@property
def stages(self):
return {"blocking"}
@property
def required(self):
return set()
async def launch(self, _):
async with self.stage("blocking"):
await asyncio.sleep(0.2)
lc = _L()
mgr.add_component(lc)
loop = asyncio.new_event_loop()
launch_tsk = loop.create_task(mgr.launch())
loop.run_until_complete(asyncio.sleep(0.02)) # head time
assert "empty" in mgr.tasks
wrong_launch = loop.create_task(mgr.launch())
loop.run_until_complete(asyncio.sleep(0.1))
mgr._on_sys_signal(None, None, launch_tsk)
assert not launch_tsk.done()
assert wrong_launch.done()
loop.run_until_complete(launch_tsk)
mgr._on_sys_signal(None, None, launch_tsk)
def test_manager_stat():
mgr = Launart()
class _L(Service):
id = "test_stat"
@property
def stages(self):
return {"preparing", "blocking", "cleanup"}
@property
def required(self):
return set()
async def launch(self, _):
await self.status.wait_for() # test empty wait
await asyncio.sleep(0.02)
async with self.stage("preparing"):
await asyncio.sleep(0.02)
await asyncio.sleep(0.02)
assert self.status.prepared
async with self.stage("blocking"):
assert self.status.prepared
assert self.status.blocking
await asyncio.sleep(0.02)
await asyncio.sleep(0.02)
async with self.stage("cleanup"):
assert not self.status.prepared
await asyncio.sleep(0.02)
await asyncio.sleep(0.02)
mgr.add_component(_L())
loop = asyncio.new_event_loop()
mk_task = loop.create_task
tasks = [mk_task(mgr.status.wait_for_preparing())]
loop.run_until_complete(asyncio.sleep(0.01))
mk_task(mgr.launch())
tasks.append(mk_task(mgr.status.wait_for_blocking()))
tasks.append(mk_task(mgr.status.wait_for_cleaning()))
loop.run_until_complete(asyncio.sleep(0.02))
tasks.append(mk_task(mgr.status.wait_for_sigexit()))
tasks.append(mk_task(mgr.status.wait_for_finished()))
loop.run_until_complete(asyncio.sleep(0.2))
for task in tasks:
assert task.done() and not task.cancelled()
@pytest.mark.asyncio
async def test_wait_for(event_loop: asyncio.AbstractEventLoop):
loop = event_loop
mgr = Launart()
class _L(Service):
id = "test_stat"
@property
def stages(self):
return {"preparing", "blocking", "cleanup"}
@property
def required(self):
return set()
async def launch(self, _):
await self.status.wait_for() # test empty wait
await asyncio.sleep(0.02)
async with self.stage("preparing"):
await asyncio.sleep(0.02)
await asyncio.sleep(0.02)
assert self.status.prepared
async with self.stage("blocking"):
assert self.status.prepared
assert self.status.blocking
await asyncio.sleep(0.02)
await asyncio.sleep(0.02)
async with self.stage("cleanup"):
assert not self.status.prepared
await asyncio.sleep(0.02)
l = _L()
with pytest.raises(RuntimeError):
await l.wait_for("preparing", "test_stat")
mgr.add_component(l)
mk_task = loop.create_task
mk_task(mgr.launch())
await l.wait_for("finished", "test_stat")
def test_signal_change_during_running():
mgr = Launart()
class _L(Service):
id = "empty"
triggered = False
@property
def stages(self):
return {"blocking"}
@property
def required(self):
return set()
async def launch(self, _):
signal(SIGINT, lambda *_: None)
lc = _L()
mgr.add_component(lc)
mgr.launch_blocking()
def test_bare_bone():
mgr = Launart()
lc = component("component.test", [])
mgr.add_component(lc)
assert mgr.components["component.test"] == lc
srv = service("service.test", [])
mgr.add_component(srv)
assert mgr.components["service.test"] == srv
assert mgr.get_component("service.test") is srv
assert mgr.get_component("service.test") is srv
with pytest.raises(ValueError):
mgr.add_component(lc)
with pytest.raises(ValueError):
mgr.get_component("$?")
@pytest.mark.asyncio
async def test_basic_components():
stage = []
class TestService(Service):
id = "lc.test"
required: set[str] = {"service.test"}
@property
def stages(self):
return {"preparing", "blocking", "cleanup"}
async def launch(self, _):
async with self.stage("preparing"):
stage.append("lc prepare")
async with self.stage("blocking"):
stage.append("blocking")
assert isinstance(mgr.tasks["lc.test"], asyncio.Task)
async with self.stage("cleanup"):
stage.append("lc cleanup")
class TestSrv(Service):
id = "service.test"
@property
def required(self):
return set()
@property
def stages(self):
return {"preparing", "blocking", "cleanup"}
async def launch(self, _):
async with self.stage("preparing"):
stage.append("srv prepare")
async with self.stage("blocking"):
stage.append("blocking")
async with self.stage("cleanup"):
stage.append("srv cleanup")
def get_interface(self, interface_type):
return interface_type()
mgr = Launart()
mgr.add_component(TestService())
mgr.add_component(TestSrv())
with pytest.raises(KeyError):
mgr.tasks["service.test"]
await mgr.launch()
print(stage)
assert stage == ["srv prepare", "lc prepare", "blocking", "blocking", "lc cleanup", "srv cleanup"]
def test_graceful_abort():
failure: bool = False
class Malfunction(Service):
id = "malfunction"
@property
def required(self):
return set()
@property
def stages(self):
return {"preparing"}
async def launch(self, _):
async with self.stage("preparing"):
raise ValueError
class Dependent(Service):
id = "dependent"
@property
def required(self):
return {"malfunction"}
@property
def stages(self):
return {"preparing", "blocking"}
async def launch(self, _):
async with self.stage("preparing"):
...
async with self.stage("blocking"):
nonlocal failure
failure = True
class Okay(Service):
id = "okay"
@property
def required(self):
return set()
@property
def stages(self):
return {"preparing", "blocking"}
async def launch(self, _):
async with self.stage("preparing"):
pass
async with self.stage("blocking"):
pass
mgr = Launart()
mgr.add_component(Malfunction())
mgr.add_component(Dependent())
mgr.launch_blocking()
if failure:
pytest.fail("Error: dependent reached blocking stage while dependency failed to prepare.")