-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasync_task_manager.py
81 lines (58 loc) · 2.71 KB
/
async_task_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""
Simple asynchronous task executor.
Designed with UI callback handling in mind, so UI click spam
won't take everything down with it.
"""
from typing import Callable, Awaitable, Any, Union
import trio
from loguru import logger
__all__ = ["AsyncTaskManager"]
class AsyncTaskManager:
def __init__(self, max_concurrency=4, execute_interval=0.1):
"""Receive & execute tasks with simple throttling & graceful shutdown.
Args:
max_concurrency: Max number of simultaneous tasks.
Drops the oldest task to accept the newest.
execute_interval: Task scheduling interval.
"""
self.max_concurrency = max_concurrency
self.exe_interval = execute_interval
# binding again to make pylint acknowledge type hint
s_ch, r_ch = trio.open_memory_channel(max_concurrency)
self._send_ch: trio.MemorySendChannel = s_ch
self._recv_ch: trio.MemoryReceiveChannel = r_ch
self._nursery: Union[trio.Nursery, None] = None
async def run_executor(self):
"""Opens new memory channel & starts executor task."""
async with trio.open_nursery() as nursery:
self._nursery = nursery
# await for incoming tasks
async for task, args in self._recv_ch:
logger.debug("Receiving task {}", task.__name__)
# if there's headroom in concurrency lim, start new task
# trio.CapacityLimiter or Lock is overkill for this
if len(nursery.child_tasks) < self.max_concurrency:
nursery.start_soon(task, *args)
else:
logger.debug("Tasks full, dropped {}", task.__name__)
await trio.sleep(self.exe_interval)
logger.debug("Executor stopped")
def add_task(self, async_task: Callable[[Any], Awaitable], *args):
"""Adds task to executor
Args:
async_task: Async function - Not the resulting coroutine or Awaitable Class.
*args: Args for calling async_task. Use partial if you want kwargs.
"""
try:
self._send_ch.send_nowait(async_task, args)
except trio.WouldBlock:
# if queue's full then drop item & add new synchronously.
func, _ = self._recv_ch.receive_nowait()
self._send_ch.send_nowait(async_task, args)
logger.debug("Queue full, dropped '{}'", func.__name__)
async def stop_executor(self):
"""Stops memory channel & executor gracefully."""
await self._send_ch.aclose()
# if nursery is still not closed then manually cancel
if self._nursery.child_tasks:
await self._nursery.cancel_scope.cancel()