33# SPDX-License-Identifier: MIT
44#
55# MicroPython uasyncio module
6- # MIT license; Copyright (c) 2019-2020 Damien P. George
6+ # MIT license; Copyright (c) 2019-2022 Damien P. George
77#
88# This code comes from MicroPython, and has not been run through black or pylint there.
99# Altering these files significantly would make merging difficult, so we will not use
1919from . import core
2020
2121
22+ async def _run (waiter , aw ):
23+ try :
24+ result = await aw
25+ status = True
26+ except BaseException as er :
27+ result = None
28+ status = er
29+ if waiter .data is None :
30+ # The waiter is still waiting, cancel it.
31+ if waiter .cancel ():
32+ # Waiter was cancelled by us, change its CancelledError to an instance of
33+ # CancelledError that contains the status and result of waiting on aw.
34+ # If the wait_for task subsequently gets cancelled externally then this
35+ # instance will be reset to a CancelledError instance without arguments.
36+ waiter .data = core .CancelledError (status , result )
37+
2238async def wait_for (aw , timeout , sleep = core .sleep ):
2339 """Wait for the *aw* awaitable to complete, but cancel if it takes longer
2440 than *timeout* seconds. If *aw* is not a task then a task will be created
@@ -36,41 +52,26 @@ async def wait_for(aw, timeout, sleep=core.sleep):
3652 if timeout is None :
3753 return await aw
3854
39- async def runner (waiter , aw ):
40- nonlocal status , result
41- try :
42- result = await aw
43- s = True
44- except BaseException as er :
45- s = er
46- if status is None :
47- # The waiter is still waiting, set status for it and cancel it.
48- status = s
49- waiter .cancel ()
50-
5155 # Run aw in a separate runner task that manages its exceptions.
52- status = None
53- result = None
54- runner_task = core .create_task (runner (core .cur_task , aw ))
56+ runner_task = core .create_task (_run (core .cur_task , aw ))
5557
5658 try :
5759 # Wait for the timeout to elapse.
5860 await sleep (timeout )
5961 except core .CancelledError as er :
60- if status is True :
61- # aw completed successfully and cancelled the sleep, so return aw's result.
62- return result
63- elif status is None :
62+ status = er .value
63+ if status is None :
6464 # This wait_for was cancelled externally, so cancel aw and re-raise.
65- status = True
6665 runner_task .cancel ()
6766 raise er
67+ elif status is True :
68+ # aw completed successfully and cancelled the sleep, so return aw's result.
69+ return er .args [1 ]
6870 else :
6971 # aw raised an exception, propagate it out to the caller.
7072 raise status
7173
7274 # The sleep finished before aw, so cancel aw and raise TimeoutError.
73- status = True
7475 runner_task .cancel ()
7576 await runner_task
7677 raise core .TimeoutError
@@ -85,6 +86,12 @@ def wait_for_ms(aw, timeout):
8586 return wait_for (aw , timeout , core .sleep_ms )
8687
8788
89+ class _Remove :
90+ @staticmethod
91+ def remove (t ):
92+ pass
93+
94+
8895async def gather (* aws , return_exceptions = False ):
8996 """Run all *aws* awaitables concurrently. Any *aws* that are not tasks
9097 are promoted to tasks.
@@ -93,22 +100,65 @@ async def gather(*aws, return_exceptions=False):
93100
94101 This is a coroutine.
95102 """
103+ def done (t , er ):
104+ # Sub-task "t" has finished, with exception "er".
105+ nonlocal state
106+ if gather_task .data is not _Remove :
107+ # The main gather task has already been scheduled, so do nothing.
108+ # This happens if another sub-task already raised an exception and
109+ # woke the main gather task (via this done function), or if the main
110+ # gather task was cancelled externally.
111+ return
112+ elif not return_exceptions and not isinstance (er , StopIteration ):
113+ # A sub-task raised an exception, indicate that to the gather task.
114+ state = er
115+ else :
116+ state -= 1
117+ if state :
118+ # Still some sub-tasks running.
119+ return
120+ # Gather waiting is done, schedule the main gather task.
121+ core ._task_queue .push (gather_task )
96122
97123 ts = [core ._promote_to_task (aw ) for aw in aws ]
98124 for i in range (len (ts )):
99- try :
100- # TODO handle cancel of gather itself
101- # if ts[i].coro:
102- # iter(ts[i]).waiting.push_head(cur_task)
103- # try:
104- # yield
105- # except CancelledError as er:
106- # # cancel all waiting tasks
107- # raise er
108- ts [i ] = await ts [i ]
109- except (core .CancelledError , Exception ) as er :
110- if return_exceptions :
111- ts [i ] = er
112- else :
113- raise er
125+ if ts [i ].state is not True :
126+ # Task is not running, gather not currently supported for this case.
127+ raise RuntimeError ("can't gather" )
128+ # Register the callback to call when the task is done.
129+ ts [i ].state = done
130+
131+ # Set the state for execution of the gather.
132+ gather_task = core .cur_task
133+ state = len (ts )
134+ cancel_all = False
135+
136+ # Wait for the a sub-task to need attention.
137+ gather_task .data = _Remove
138+ try :
139+ yield
140+ except core .CancelledError as er :
141+ cancel_all = True
142+ state = er
143+
144+ # Clean up tasks.
145+ for i in range (len (ts )):
146+ if ts [i ].state is done :
147+ # Sub-task is still running, deregister the callback and cancel if needed.
148+ ts [i ].state = True
149+ if cancel_all :
150+ ts [i ].cancel ()
151+ elif isinstance (ts [i ].data , StopIteration ):
152+ # Sub-task ran to completion, get its return value.
153+ ts [i ] = ts [i ].data .value
154+ else :
155+ # Sub-task had an exception with return_exceptions==True, so get its exception.
156+ ts [i ] = ts [i ].data
157+
158+ # Either this gather was cancelled, or one of the sub-tasks raised an exception with
159+ # return_exceptions==False, so reraise the exception here.
160+ if state is not 0 :
161+ raise state
162+
163+ # Return the list of return values of each sub-task.
114164 return ts
0 commit comments