forked from peterhinch/micropython-async
-
Notifications
You must be signed in to change notification settings - Fork 1
/
asyncio_priority.py
219 lines (205 loc) · 8.87 KB
/
asyncio_priority.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# asyncio_priority.py Modified version of uasyncio with priority mechanism.
# Author: Peter Hinch
# Copyright Peter Hinch 2017 Released under the MIT license
# For uasyncio.core V1.4.1
import utime as time
import utimeq
from uasyncio import *
class PriorityEventLoop(PollEventLoop):
def __init__(self, len=42, lpqlen=42, max_overdue_ms=0, hpqlen=0):
super().__init__(len)
self._max_overdue_ms = max_overdue_ms
self.lpq = utimeq.utimeq(lpqlen)
if hpqlen:
self.hpq = [[0,0,0] for _ in range(hpqlen)]
else:
self.hpq = None
def max_overdue_ms(self, t=None):
if t is not None:
self._max_overdue_ms = t
return self._max_overdue_ms
def call_after_ms(self, delay, callback, args=()):
# low priority.
t = time.ticks_add(self.time(), delay)
if __debug__ and DEBUG:
log.debug("Scheduling LP %s", (time, callback, args))
self.lpq.push(t, callback, args)
def call_after(self, delay, callback, *args):
# low priority.
t = time.ticks_add(self.time(), int(delay * 1000))
if __debug__ and DEBUG:
log.debug("Scheduling LP %s", (time, callback, args))
self.lpq.push(t, callback, args)
def _schedule_hp(self, func, callback, args=()):
if self.hpq is None:
self.hpq = [func, callback, args]
else: # Try to assign without allocation
for entry in self.hpq:
if not entry[0]:
entry[0] = func
entry[1] = callback
entry[2] = args
break
else:
self.hpq.append([func, callback, args])
def run_forever(self):
cur_task = [0, 0, 0]
while True:
if self.q:
# wait() may finish prematurely due to I/O completion,
# and schedule new, earlier than before tasks to run.
while 1:
# Check high priority queue
if self.hpq is not None:
hp_found = False
for entry in self.hpq:
if entry[0] and entry[0]():
hp_found = True
entry[0] = 0
cur_task[0] = 0
cur_task[1] = entry[1] # ??? quick non-allocating copy
cur_task[2] = entry[2]
break
if hp_found:
break
# Schedule most overdue LP coro
tnow = self.time()
if self.lpq and self._max_overdue_ms > 0:
t = self.lpq.peektime()
overdue = -time.ticks_diff(t, tnow)
if overdue > self._max_overdue_ms:
self.lpq.pop(cur_task)
break
# Schedule any due normal task
t = self.q.peektime()
delay = time.ticks_diff(t, tnow)
if delay <= 0:
# https://github.com/micropython/micropython-lib/pull/201
# Always call wait(), to give a chance to I/O scheduling
self.wait(0)
self.q.pop(cur_task)
break
# Schedule any due LP task
if self.lpq:
t = self.lpq.peektime()
lpdelay = time.ticks_diff(t, tnow)
if lpdelay <= 0:
self.lpq.pop(cur_task)
break
delay = min(delay, lpdelay)
self.wait(delay) # superclass
t = cur_task[0]
cb = cur_task[1]
args = cur_task[2]
if __debug__ and DEBUG:
log.debug("Next coroutine to run: %s", (t, cb, args))
# __main__.mem_info()
else: # Normal q is empty
ready = False
if self.lpq:
t = self.lpq.peektime()
delay = time.ticks_diff(t, self.time())
if delay <= 0:
self.lpq.pop(cur_task)
t = cur_task[0]
cb = cur_task[1]
args = cur_task[2]
if __debug__ and DEBUG:
log.debug("Next coroutine to run: %s", (t, cb, args))
ready = True
if not ready:
self.wait(-1)
# Assuming IO completion scheduled some tasks
continue
if callable(cb):
cb(*args)
else:
delay = 0
func = None
priority = True
try:
if __debug__ and DEBUG:
log.debug("Coroutine %s send args: %s", cb, args)
if args == ():
ret = next(cb)
else:
ret = cb.send(*args)
if __debug__ and DEBUG:
log.debug("Coroutine %s yield result: %s", cb, ret)
if isinstance(ret, SysCall1):
arg = ret.arg
if isinstance(ret, AfterMs):
priority = False
if isinstance(ret, Sleep) or isinstance(ret, After):
delay = int(arg * 1000)
elif isinstance(ret, When):
if callable(arg):
func = arg
else:
assert False, "Argument to 'when' must be a function or method."
elif isinstance(ret, SleepMs):
delay = arg
elif isinstance(ret, IORead):
# self.add_reader(ret.obj.fileno(), lambda self, c, f: self.call_soon(c, f), self, cb, ret.obj)
# self.add_reader(ret.obj.fileno(), lambda c, f: self.call_soon(c, f), cb, ret.obj)
# self.add_reader(arg.fileno(), lambda cb: self.call_soon(cb), cb)
self.add_reader(arg, cb)
continue
elif isinstance(ret, IOWrite):
# self.add_writer(arg.fileno(), lambda cb: self.call_soon(cb), cb)
self.add_writer(arg, cb)
continue
elif isinstance(ret, IOReadDone):
self.remove_reader(arg)
elif isinstance(ret, IOWriteDone):
self.remove_writer(arg)
elif isinstance(ret, StopLoop):
return arg
else:
assert False, "Unknown syscall yielded: %r (of type %r)" % (ret, type(ret))
elif isinstance(ret, type_gen):
self.call_soon(ret)
elif isinstance(ret, int):
# Delay
delay = ret
elif ret is None:
# Just reschedule
pass
else:
assert False, "Unsupported coroutine yield value: %r (of type %r)" % (ret, type(ret))
except StopIteration as e:
if __debug__ and DEBUG:
log.debug("Coroutine finished: %s", cb)
continue
# _schedule_hp() and call_after_ms() accept args as a tuple so should
# work with syscalls returning data
if func is not None:
self._schedule_hp(func, cb, args)
else:
if priority:
# Currently all syscalls don't return anything, so we don't
# need to feed anything to the next invocation of coroutine.
# If that changes, need to pass that value below.
self.call_later_ms(delay, cb)
else:
self.call_after_ms(delay, cb, args)
class Sleep(SleepMs):
pass
# Low priority
class AfterMs(SleepMs):
pass
class After(AfterMs):
pass
# High Priority
class When(SleepMs):
pass
after_ms = AfterMs()
after = After()
when = When()
_event_loop = None
_event_loop_class = PriorityEventLoop
def get_event_loop(len=42, lpqlen=42, max_overdue_ms=0, hpqlen=0):
global _event_loop
if _event_loop is None:
_event_loop = _event_loop_class(len, lpqlen, max_overdue_ms, hpqlen)
return _event_loop