Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-97696: DRAFT asyncio eager tasks factory prototype #101613

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1ea845c
Copy GH-98137, make some changes, add benchmarking script, trying to …
itamaro Feb 1, 2023
de6d910
asyncio runner takes optional task factory arg
itamaro Feb 3, 2023
56610e7
don't coro.send if the event loop is not running (yet)
itamaro Feb 3, 2023
34082a7
modify async tree script to support eager factory
itamaro Feb 3, 2023
ce5beaf
don't over-count tasks that yield result immediately
itamaro Feb 3, 2023
38d7b0b
handle task._source_traceback in eager task factory
itamaro Feb 6, 2023
6afbaab
stop checking for eager factory in taskgroups
itamaro Feb 10, 2023
a6e68bc
refactor async tree benchmark to work with TaskGroup or gather depend…
itamaro Feb 10, 2023
464bd49
restore C task
itamaro Feb 10, 2023
ac26ad6
yield_result -> coro_result
itamaro Feb 10, 2023
ae2dcf3
Support `coro_result` in Task C impl
itamaro Feb 10, 2023
9794715
Merge branch 'main' into asyncio-eager-tasks-playground
itamaro Feb 22, 2023
0b447b0
Address Dino's review comments
itamaro Feb 22, 2023
f2748e2
passthrough coro_result from custom constructor only if it is set
itamaro Feb 25, 2023
61ac5d0
add != NULL assertion on step2 result
itamaro Feb 26, 2023
cbd14fb
Merge branch 'main' into asyncio-eager-tasks-playground
itamaro Mar 3, 2023
3503337
fix result refcnting in task_step_handle_result_impl
itamaro Mar 10, 2023
49c0e89
Add eager task factory tests
itamaro Mar 11, 2023
8a5229a
cleanup eager task factory tests
itamaro Mar 15, 2023
d1e5fd1
add news blurb
itamaro Mar 15, 2023
580fb1f
Merge branch 'main' into asyncio-eager-tasks-playground
itamaro Mar 15, 2023
6284c41
apply patchcheck fixes in asyncio.tasks
itamaro Mar 15, 2023
34123a7
regenerate clinic
itamaro Mar 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Include/internal/pycore_global_objects_fini_generated.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Include/internal/pycore_global_strings.h
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ struct _Py_global_strings {
STRUCT_FOR_ID(copy)
STRUCT_FOR_ID(copyreg)
STRUCT_FOR_ID(coro)
STRUCT_FOR_ID(coro_result)
STRUCT_FOR_ID(count)
STRUCT_FOR_ID(cwd)
STRUCT_FOR_ID(d)
Expand Down
1 change: 1 addition & 0 deletions Include/internal/pycore_runtime_init_generated.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Include/internal/pycore_unicodeobject_generated.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 34 additions & 1 deletion Lib/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,40 @@
tasks.__all__ +
threads.__all__ +
timeouts.__all__ +
transports.__all__)
transports.__all__ + (
'create_eager_task_factory',
'eager_task_factory',
))

# throwing things here temporarily to defer premature dir layout bikeshedding

def create_eager_task_factory(custom_task_constructor):

def factory(loop, coro, *, name=None, context=None):
loop._check_closed()
if not loop.is_running():
return custom_task_constructor(coro, loop=loop, name=name, context=context)

try:
result = coro.send(None)
except StopIteration as si:
fut = loop.create_future()
fut.set_result(si.value)
return fut
except Exception as ex:
fut = loop.create_future()
fut.set_exception(ex)
return fut
else:
task = custom_task_constructor(
coro, loop=loop, name=name, context=context, coro_result=result)
if task._source_traceback:
del task._source_traceback[-1]
return task

return factory

eager_task_factory = create_eager_task_factory(Task)

if sys.platform == 'win32': # pragma: no cover
from .windows_events import *
Expand Down
9 changes: 6 additions & 3 deletions Lib/asyncio/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ class Runner:

# Note: the class is final, it is not intended for inheritance.

def __init__(self, *, debug=None, loop_factory=None):
def __init__(self, *, debug=None, loop_factory=None, task_factory=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need a separate task_factory here? Ultimately this is just influencing the loop after it's created, couldn't that be rolled into the existing loop_factory mechanism?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no we don't need this. it was convenient to add it for my testing. sure, the same result can be achieved with the loop factory, but this feels cleaner, so I figured I'd suggest it and see what others think

self._state = _State.CREATED
self._debug = debug
self._loop_factory = loop_factory
self._task_factory = task_factory
self._loop = None
self._context = None
self._interrupt_count = 0
Expand Down Expand Up @@ -144,6 +145,8 @@ def _lazy_init(self):
self._loop = self._loop_factory()
if self._debug is not None:
self._loop.set_debug(self._debug)
if self._task_factory is not None:
self._loop.set_task_factory(self._task_factory)
self._context = contextvars.copy_context()
self._state = _State.INITIALIZED

Expand All @@ -157,7 +160,7 @@ def _on_sigint(self, signum, frame, main_task):
raise KeyboardInterrupt()


def run(main, *, debug=None, loop_factory=None):
def run(main, *, debug=None, loop_factory=None, task_factory=None):
"""Execute the coroutine and return the result.

This function runs the passed coroutine, taking care of
Expand Down Expand Up @@ -190,7 +193,7 @@ async def main():
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")

with Runner(debug=debug, loop_factory=loop_factory) as runner:
with Runner(debug=debug, loop_factory=loop_factory, task_factory=task_factory) as runner:
return runner.run(main)


Expand Down
3 changes: 2 additions & 1 deletion Lib/asyncio/taskgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ def create_task(self, coro, *, name=None, context=None):
task = self._loop.create_task(coro)
else:
task = self._loop.create_task(coro, context=context)
tasks._set_task_name(task, name)
if name is not None and not task.done(): # If it's done already, it's a future
tasks._set_task_name(task, name)
task.add_done_callback(self._on_task_done)
self._tasks.add(task)
return task
Expand Down
95 changes: 52 additions & 43 deletions Lib/asyncio/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ def _set_task_name(task, name):
set_name(name)


_NOT_SET = object()

class Task(futures._PyFuture): # Inherit Python Task implementation
# from a Python Future implementation.

Expand All @@ -94,7 +96,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
# status is still pending
_log_destroy_pending = True

def __init__(self, coro, *, loop=None, name=None, context=None):
def __init__(self, coro, *, loop=None, name=None, context=None,
coro_result=_NOT_SET):
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
Expand All @@ -118,7 +121,10 @@ def __init__(self, coro, *, loop=None, name=None, context=None):
else:
self._context = context

self._loop.call_soon(self.__step, context=self._context)
if coro_result is _NOT_SET:
self._loop.call_soon(self.__step, context=self._context)
else:
self.__step_handle_result(coro_result)
_register_task(self)

def __del__(self):
Expand Down Expand Up @@ -288,55 +294,58 @@ def __step(self, exc=None):
except BaseException as exc:
super().set_exception(exc)
else:
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking is not None:
self.__step_handle_result(result)
finally:
_leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.

def __step_handle_result(self, result):
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking is not None:
# Yielded Future must come from Future.__iter__().
if futures._get_loop(result) is not self._loop:
if futures._get_loop(result) is not self._loop:
new_exc = RuntimeError(
f'Task {self!r} got Future '
f'{result!r} attached to a different loop')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
elif blocking:
if result is self:
new_exc = RuntimeError(
f'Task {self!r} got Future '
f'{result!r} attached to a different loop')
f'Task cannot await on itself: {self!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
elif blocking:
if result is self:
new_exc = RuntimeError(
f'Task cannot await on itself: {self!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
else:
result._asyncio_future_blocking = False
result.add_done_callback(
self.__wakeup, context=self._context)
self._fut_waiter = result
if self._must_cancel:
if self._fut_waiter.cancel(
msg=self._cancel_message):
self._must_cancel = False
else:
new_exc = RuntimeError(
f'yield was used instead of yield from '
f'in task {self!r} with {result!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)

elif result is None:
# Bare yield relinquishes control for one event loop iteration.
self._loop.call_soon(self.__step, context=self._context)
elif inspect.isgenerator(result):
# Yielding a generator is just wrong.
new_exc = RuntimeError(
f'yield was used instead of yield from for '
f'generator in task {self!r} with {result!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
result._asyncio_future_blocking = False
result.add_done_callback(
self.__wakeup, context=self._context)
self._fut_waiter = result
if self._must_cancel:
if self._fut_waiter.cancel(
msg=self._cancel_message):
self._must_cancel = False
else:
# Yielding something else is an error.
new_exc = RuntimeError(f'Task got bad yield: {result!r}')
new_exc = RuntimeError(
f'yield was used instead of yield from '
f'in task {self!r} with {result!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
finally:
_leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.

elif result is None:
# Bare yield relinquishes control for one event loop iteration.
self._loop.call_soon(self.__step, context=self._context)
elif inspect.isgenerator(result):
# Yielding a generator is just wrong.
new_exc = RuntimeError(
f'yield was used instead of yield from for '
f'generator in task {self!r} with {result!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
else:
# Yielding something else is an error.
new_exc = RuntimeError(f'Task got bad yield: {result!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)

def __wakeup(self, future):
try:
Expand Down
Loading