-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasyncio_gather_errors.py
49 lines (31 loc) · 1.01 KB
/
asyncio_gather_errors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import asyncio
from contextlib import asynccontextmanager
async def error_after(n):
for i in range(n):
print(f"{i}/{n} ... ")
await asyncio.sleep(1)
raise Exception(f"blah {n}")
@asynccontextmanager
async def some_context():
print("context started... ")
errors_after = [3, 6, 9, 12, 15]
tasks = [asyncio.create_task(error_after(n)) for n in errors_after]
print("context initialization finished, tasks started...")
yield
print("context shutting down...")
for t in tasks:
t.cancel()
print("tasks cancelled")
results = await asyncio.gather(*tasks, return_exceptions=True)
# try:
# results = await asyncio.gather(*tasks, return_exceptions=True)
# except Exception as e:
# print("Error :", e)
print("results: ", results)
print("context shutdown")
async def run():
async with some_context():
print("main task started...")
await asyncio.sleep(4)
print("main task finished...")
asyncio.run(run())