diff --git a/tornado/gen.py b/tornado/gen.py index 7cc7ec7857..aa32a2fcd4 100644 --- a/tornado/gen.py +++ b/tornado/gen.py @@ -76,7 +76,6 @@ def get(self): from functools import singledispatch from inspect import isawaitable import sys -import types from tornado.concurrent import ( Future, @@ -200,31 +199,17 @@ def wrapper(*args, **kwargs): future = None # type: ignore else: if isinstance(result, Generator): - # Inline the first iteration of Runner.run. This lets us - # avoid the cost of creating a Runner when the coroutine - # never actually yields, which in turn allows us to - # use "optional" coroutines in critical path code without - # performance penalty for the synchronous case. - try: - yielded = next(result) - except (StopIteration, Return) as e: - future_set_result_unless_cancelled( - future, _value_from_stopiteration(e) - ) - except Exception: - future_set_exc_info(future, sys.exc_info()) - else: - # Provide strong references to Runner objects as long - # as their result future objects also have strong - # references (typically from the parent coroutine's - # Runner). This keeps the coroutine's Runner alive. - # We do this by exploiting the public API - # add_done_callback() instead of putting a private - # attribute on the Future. - # (Github issues #1769, #2229). - runner = Runner(result, future, yielded) - future.add_done_callback(lambda _: runner) - yielded = None + # Provide strong references to Runner objects as long + # as their result future objects also have strong + # references (typically from the parent coroutine's + # Runner). This keeps the coroutine's Runner alive. + # We do this by exploiting the public API + # add_done_callback() instead of putting a private + # attribute on the Future. + # (Github issues #1769, #2229). + runner = Runner(result, future) + future.add_done_callback(runner.finish) + try: return future finally: @@ -698,10 +683,7 @@ class Runner(object): """ def __init__( - self, - gen: "Generator[_Yieldable, Any, _T]", - result_future: "Future[_T]", - first_yielded: _Yieldable, + self, gen: "Generator[_Yieldable, Any, _T]", result_future: "Future[_T]" ) -> None: self.gen = gen self.result_future = result_future @@ -709,97 +691,73 @@ def __init__( self.running = False self.finished = False self.io_loop = IOLoop.current() - if self.handle_yield(first_yielded): - gen = result_future = first_yielded = None # type: ignore - self.run() + self.task = asyncio.ensure_future(self.run()) + + async def run(self) -> None: + "Runs the generator to completion in the context of a task" + while True: + future = self.future + if future is None: + raise Exception("No pending future") + if not self.future.done(): # type: ignore + _step = asyncio.Event() + + def step(f: "Future[_T]") -> None: + _step.set() + + self.io_loop.add_future(self.future, step) # type: ignore + await _step.wait() + self.future = None + try: + exc_info = None - def run(self) -> None: - """Starts or resumes the generator, running until it reaches a - yield point that is not ready. - """ - if self.running or self.finished: - return - try: - self.running = True - while True: - future = self.future - if future is None: - raise Exception("No pending future") - if not future.done(): - return - self.future = None try: - exc_info = None + value = future.result() + except Exception: + exc_info = sys.exc_info() + future = None + if exc_info is not None: try: - value = future.result() - except Exception: - exc_info = sys.exc_info() - future = None - - if exc_info is not None: - try: - yielded = self.gen.throw(*exc_info) # type: ignore - finally: - # Break up a reference to itself - # for faster GC on CPython. - exc_info = None - else: - yielded = self.gen.send(value) + yielded = self.gen.throw(*exc_info) # type: ignore + finally: + # Break up a reference to itself + # for faster GC on CPython. + exc_info = None + else: + yielded = self.gen.send(value) - except (StopIteration, Return) as e: - self.finished = True - self.future = _null_future - future_set_result_unless_cancelled( - self.result_future, _value_from_stopiteration(e) - ) - self.result_future = None # type: ignore - return - except Exception: - self.finished = True - self.future = _null_future - future_set_exc_info(self.result_future, sys.exc_info()) - self.result_future = None # type: ignore - return - if not self.handle_yield(yielded): - return - yielded = None - finally: - self.running = False - - def handle_yield(self, yielded: _Yieldable) -> bool: + except (StopIteration, Return) as e: + self.finished = True + self.future = _null_future + future_set_result_unless_cancelled( + self.result_future, _value_from_stopiteration(e) + ) + self.result_future = None # type: ignore + return + except Exception: + self.finished = True + self.future = _null_future + future_set_exc_info(self.result_future, sys.exc_info()) + self.result_future = None # type: ignore + return + + self.handle_yield(yielded) + yielded = None + if self.future is moment: + await sleep(0) + + def handle_yield(self, yielded: _Yieldable) -> None: try: self.future = convert_yielded(yielded) except BadYieldError: self.future = Future() future_set_exc_info(self.future, sys.exc_info()) - if self.future is moment: - self.io_loop.add_callback(self.run) - return False - elif self.future is None: - raise Exception("no pending future") - elif not self.future.done(): - - def inner(f: Any) -> None: - # Break a reference cycle to speed GC. - f = None # noqa: F841 - self.run() - - self.io_loop.add_future(self.future, inner) - return False - return True - - def handle_exception( - self, typ: Type[Exception], value: Exception, tb: types.TracebackType - ) -> bool: - if not self.running and not self.finished: - self.future = Future() - future_set_exc_info(self.future, (typ, value, tb)) - self.run() - return True - else: - return False + def finish(self, future: "Future[_T]") -> None: + if future.cancelled(): + self.task.cancel() + self.future.cancel() # type: ignore # Convert Awaitables into Futures. diff --git a/tornado/ioloop.py b/tornado/ioloop.py index a0598727a4..6cb2367ab2 100644 --- a/tornado/ioloop.py +++ b/tornado/ioloop.py @@ -506,7 +506,11 @@ def run() -> None: future_cell[0] = fut fut.set_result(result) assert future_cell[0] is not None - self.add_future(future_cell[0], lambda future: self.stop()) + + def _stop(f: "Future[_T]") -> None: + f.add_done_callback(lambda _: self.stop()) + + self.add_future(future_cell[0], _stop) self.add_callback(run) if timeout is not None: diff --git a/tornado/test/gen_test.py b/tornado/test/gen_test.py index 659e22605e..37bad9fb53 100644 --- a/tornado/test/gen_test.py +++ b/tornado/test/gen_test.py @@ -636,6 +636,23 @@ def f(): self.assertEqual(ret, [1, 1]) self.finished = True + def test_coroutine_context(self): + @gen.coroutine + def f(): + current_task = getattr(asyncio, "current_task", None) + if current_task is None: + current_task = getattr(asyncio.Task, "current_task", None) + task = current_task() + assert task + _id = id(task) + yield gen.moment + task = current_task() + assert task + assert _id == id(task) + + self.io_loop.run_sync(f, timeout=3) + self.finished = True + class GenCoroutineSequenceHandler(RequestHandler): @gen.coroutine @@ -995,8 +1012,9 @@ def do_something(): yield gen.sleep(0.2) loop.run_sync(do_something) - loop.close() - gc.collect() + with ExpectLog("asyncio", "Task was destroyed but it is pending"): + loop.close() + gc.collect() # Future was collected self.assertIs(wfut[0](), None) # At least one wakeup