Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run tornado gen.coroutines in the context of a task. #2716

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 68 additions & 110 deletions tornado/gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ def get(self):
from functools import singledispatch
from inspect import isawaitable
import sys
import types

from tornado.concurrent import (
Future,
Expand Down Expand Up @@ -200,31 +199,17 @@ def wrapper(*args, **kwargs):
future = None # type: ignore
else:
if isinstance(result, Generator):
# Inline the first iteration of Runner.run. This lets us
# avoid the cost of creating a Runner when the coroutine
# never actually yields, which in turn allows us to
# use "optional" coroutines in critical path code without
# performance penalty for the synchronous case.
try:
yielded = next(result)
except (StopIteration, Return) as e:
future_set_result_unless_cancelled(
future, _value_from_stopiteration(e)
)
except Exception:
future_set_exc_info(future, sys.exc_info())
else:
# Provide strong references to Runner objects as long
# as their result future objects also have strong
# references (typically from the parent coroutine's
# Runner). This keeps the coroutine's Runner alive.
# We do this by exploiting the public API
# add_done_callback() instead of putting a private
# attribute on the Future.
# (Github issues #1769, #2229).
runner = Runner(result, future, yielded)
future.add_done_callback(lambda _: runner)
yielded = None
# Provide strong references to Runner objects as long
# as their result future objects also have strong
# references (typically from the parent coroutine's
# Runner). This keeps the coroutine's Runner alive.
# We do this by exploiting the public API
# add_done_callback() instead of putting a private
# attribute on the Future.
# (Github issues #1769, #2229).
runner = Runner(result, future)
future.add_done_callback(runner.finish)

try:
return future
finally:
Expand Down Expand Up @@ -698,108 +683,81 @@ class Runner(object):
"""

def __init__(
self,
gen: "Generator[_Yieldable, Any, _T]",
result_future: "Future[_T]",
first_yielded: _Yieldable,
self, gen: "Generator[_Yieldable, Any, _T]", result_future: "Future[_T]"
) -> None:
self.gen = gen
self.result_future = result_future
self.future = _null_future # type: Union[None, Future]
self.running = False
self.finished = False
self.io_loop = IOLoop.current()
if self.handle_yield(first_yielded):
gen = result_future = first_yielded = None # type: ignore
self.run()
self.task = asyncio.ensure_future(self.run())

async def run(self) -> None:
"Runs the generator to completion in the context of a task"
while True:
future = self.future
if future is None:
raise Exception("No pending future")
if not self.future.done(): # type: ignore
_step = asyncio.Event()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style note: Tornado uses a leading underscore on "private" class/instance attributes, but local variables should just have plain names like step.


def step(f: "Future[_T]") -> None:
_step.set()

self.io_loop.add_future(self.future, step) # type: ignore
await _step.wait()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected this to be simply await self.future. We might need to create an Event for some edge cases (concurrent futures, or the moment singleton), but most of the time I think awaiting the future directly would work.

self.future = None
try:
exc_info = None

def run(self) -> None:
"""Starts or resumes the generator, running until it reaches a
yield point that is not ready.
"""
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if future is None:
raise Exception("No pending future")
if not future.done():
return
self.future = None
try:
exc_info = None
value = future.result()
except Exception:
exc_info = sys.exc_info()
future = None

if exc_info is not None:
try:
value = future.result()
except Exception:
exc_info = sys.exc_info()
future = None

if exc_info is not None:
try:
yielded = self.gen.throw(*exc_info) # type: ignore
finally:
# Break up a reference to itself
# for faster GC on CPython.
exc_info = None
else:
yielded = self.gen.send(value)
yielded = self.gen.throw(*exc_info) # type: ignore
finally:
# Break up a reference to itself
# for faster GC on CPython.
exc_info = None
else:
yielded = self.gen.send(value)

except (StopIteration, Return) as e:
self.finished = True
self.future = _null_future
future_set_result_unless_cancelled(
self.result_future, _value_from_stopiteration(e)
)
self.result_future = None # type: ignore
return
except Exception:
self.finished = True
self.future = _null_future
future_set_exc_info(self.result_future, sys.exc_info())
self.result_future = None # type: ignore
return
if not self.handle_yield(yielded):
return
yielded = None
finally:
self.running = False

def handle_yield(self, yielded: _Yieldable) -> bool:
except (StopIteration, Return) as e:
self.finished = True
self.future = _null_future
future_set_result_unless_cancelled(
self.result_future, _value_from_stopiteration(e)
)
self.result_future = None # type: ignore
return
except Exception:
self.finished = True
self.future = _null_future
future_set_exc_info(self.result_future, sys.exc_info())
self.result_future = None # type: ignore
return

self.handle_yield(yielded)
yielded = None
if self.future is moment:
await sleep(0)

def handle_yield(self, yielded: _Yieldable) -> None:
try:
self.future = convert_yielded(yielded)
except BadYieldError:
self.future = Future()
future_set_exc_info(self.future, sys.exc_info())

if self.future is moment:
self.io_loop.add_callback(self.run)
return False
elif self.future is None:
raise Exception("no pending future")
elif not self.future.done():

def inner(f: Any) -> None:
# Break a reference cycle to speed GC.
f = None # noqa: F841
self.run()

self.io_loop.add_future(self.future, inner)
return False
return True

def handle_exception(
self, typ: Type[Exception], value: Exception, tb: types.TracebackType
) -> bool:
if not self.running and not self.finished:
self.future = Future()
future_set_exc_info(self.future, (typ, value, tb))
self.run()
return True
else:
return False
def finish(self, future: "Future[_T]") -> None:
if future.cancelled():
self.task.cancel()
self.future.cancel() # type: ignore


# Convert Awaitables into Futures.
Expand Down
6 changes: 5 additions & 1 deletion tornado/ioloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,11 @@ def run() -> None:
future_cell[0] = fut
fut.set_result(result)
assert future_cell[0] is not None
self.add_future(future_cell[0], lambda future: self.stop())

def _stop(f: "Future[_T]") -> None:
f.add_done_callback(lambda _: self.stop())

self.add_future(future_cell[0], _stop)

self.add_callback(run)
if timeout is not None:
Expand Down
22 changes: 20 additions & 2 deletions tornado/test/gen_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,23 @@ def f():
self.assertEqual(ret, [1, 1])
self.finished = True

def test_coroutine_context(self):
@gen.coroutine
def f():
current_task = getattr(asyncio, "current_task", None)
if current_task is None:
current_task = getattr(asyncio.Task, "current_task", None)
task = current_task()
assert task
_id = id(task)
yield gen.moment
task = current_task()
assert task
assert _id == id(task)

self.io_loop.run_sync(f, timeout=3)
self.finished = True


class GenCoroutineSequenceHandler(RequestHandler):
@gen.coroutine
Expand Down Expand Up @@ -995,8 +1012,9 @@ def do_something():
yield gen.sleep(0.2)

loop.run_sync(do_something)
loop.close()
gc.collect()
with ExpectLog("asyncio", "Task was destroyed but it is pending"):
loop.close()
gc.collect()
# Future was collected
self.assertIs(wfut[0](), None)
# At least one wakeup
Expand Down