-
-
Notifications
You must be signed in to change notification settings - Fork 178
Make get_event_loop() return the current loop if called from coroutines/callbacks #452
Conversation
Looks good! |
@@ -400,6 +400,8 @@ def run_forever(self): | |||
old_agen_hooks = sys.get_asyncgen_hooks() | |||
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, | |||
finalizer=self._asyncgen_finalizer_hook) | |||
old_loop = events._get_current_loop() | |||
events._set_current_loop(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should go inside the try
block, so that if it executes partially the finally
will still restore the old loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will fix this.
@@ -607,6 +607,27 @@ def new_event_loop(self): | |||
# Lock for protecting the on-the-fly creation of the event loop policy. | |||
_lock = threading.Lock() | |||
|
|||
# A TLS for the current event loop, used by get_current_loop. | |||
_current_loop = threading.local() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use a subclass (like in BaseDefaultEventLoopPolicy)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, will add a subclass. I guess avoiding a getattr
call makes the code cleaner.
@@ -449,7 +449,13 @@ def new_test_loop(self, gen=None): | |||
self.set_event_loop(loop) | |||
return loop | |||
|
|||
def setUp(self): | |||
self._get_current_loop = events._get_current_loop | |||
events._get_current_loop = lambda: None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use mock.patch() as a class decorator? If I understand the docs correctly then you don't need to change everything to call super().setUp().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mock.patch
doesn't apply to subclasses of TestCase. So we'll need to decorate all TestCases, and we have a lot of them. IMO it's cleaner to fix setUp
calls.
loop = None | ||
|
||
old_policy = asyncio.get_event_loop_policy() | ||
asyncio.set_event_loop_policy(Policy()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, this ought to be inside the try
.
OK, nm on the mock.patch() thing then.
|
@gvanrossum I've updated the PR. |
What about my suggestion (in the previous PR, sorry) of using "active" or "running" instead of "current"? (I am coming back to this because in my head I keep missing the distinction between get_event_loop() and get_current_loop(). :-) |
I like |
Make it so!
|
Done :) Please see the latest patch. |
Looks good, better than the previous PR. I have a question though. Why does |
"""Equivalent to calling get_event_loop_policy().get_event_loop().""" | ||
"""Return an asyncio event loop. | ||
|
||
When called from coroutines, this function will always return the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say "When called from a coroutine or a callback (e.g. scheduled with call_soon
or a similar API), this function ..."
The more I think about it the more I agree. This would be one scenario where we have a chance to detect accidental blocking inside a coroutine or callback. @1st1 what do you think? |
Agree. Stackoverflow has several examples where authors tries to hide high level api under |
(off-topic) We should have examples of a better recommended practice. IMO such high level APIs should fork a background thread to run the event loop and use concurrent.futures.Future instances to schedule calls and be able to wait for them. |
I have an asyncio executor example that does pretty much that. I don't think it would fit in the official documentation, but maybe on asyncio-doc? |
Yeah, let's do it. I'll update the PR. |
Updated. |
@@ -394,13 +394,15 @@ def run_forever(self): | |||
self._check_closed() | |||
if self.is_running(): | |||
raise RuntimeError('Event loop is running.') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be easy to change this error to "This event loop is already running." ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be easy to change this error to "This event loop is already running." ?
Done. Now it's more clear.
LGTM! Go ahead and merge. |
Also: * Raise when a user attempts to run a loop within another loop; * Clarify error message when the loop is already running.
Thank you for reviews @gvanrossum @vxgmichel @asvetlov! |
PS. Why did the final squashed commit claim failing tests? I didn't see a Travis CI link. |
Short answer: race between Travis & GH (at least that's what it looks like). Long answer: I squashed everything and force pushed it to the PR. When I merged it to the |
The master looks green: https://travis-ci.org/python/asyncio/builds/173340363 |
Oh, your workflow is so different from mine... I use the web squash flow.
…--Guido (mobile)
|
this broke being able to use asyncio from forked processes created from a run loop :( testcase: import asyncio
import multiprocessing
def sub_proc():
async def doit():
print("hello")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(doit())
async def main():
proc = multiprocessing.Process(target=sub_proc)
proc.start()
proc.join()
if __name__ == '__main__':
_loop = asyncio.get_event_loop()
_loop.run_until_complete(main()) results in:
this works in python 3.5.2 but fails in 3.5.3. I'm guessing this exacerbated https://bugs.python.org/issue22087 |
Forking from a running event loop triggers many undefined behaviours. Just don't do it. |
this is a really unfortunate response as this is a common workflow for async unittests (https://github.com/Martiusweb/asynctest) and would require ALL workers to be created before your first async method, which for things like pools cannot be guaranteed. Further, it would break scenarios where you have a sub-processes re-spawned after serving x number of requests. I've been using this workflow for over a year in a production environment without issue. Unless the point of this PR was to break this workflow I think it should addressed. What this PR is saying is that async processes cannot launch async sub-processes, which was never previously stated or enforced. Further, in general, async forking support has been mothballed for more than a year and really needs to be addressed per the issue I referenced above. |
If one package does something in some particular way, it doesn't mean that it's a correct way. It means that we need to fix that one package.
No, it doesn't say that. It just happened to expose a bug (forking from a running event loop was never officially supported). Forking with a running event loop is discouraged in virtually all async frameworks. There are dozens of side effects that can't be reliably resolved in a cross-platform case. Alternative asyncio event loop implementations such as uvloop will simply segfault if you do this. There is a way to fork reliably, it just requires some boilerplate code. The key is to fork with the loop being paused. For example (untested): fork_cb = None
def schedule_fork(new_proc):
global fork_cb
fork_cb = new_proc
asyncio.get_event_loop().stop()
async def coro():
# some asyncio code...
# some asyncio code...
# some asyncio code...
def new_proc():
print('forked process', os.getpid())
print('forking from', os.getpid())
schedule_fork(new_proc)
loop = asyncio.get_event_loop()
loop.create_task(coro())
# Create more tasks...
while True:
loop.run_forever()
if fork_cb is not None:
cb, fork_cb = fork_cb, None
if not os.fork():
try:
cb()
finally:
os.exit()
else:
break A workaround would be to store PID in the threadlocal object that
I think we actually can add built-in support for forking to asyncio: a coroutine async def some_coro():
await asyncio.fork(code_for_child_process) I'll experiment with the idea and maybe propose this in 3.7. |
@Martiusweb Would you be able to fix forking in asynctest by using the approach outlined in the above ^^ comment? |
FYI asynctest doesn't directly fork, it's just users of asynctest like myself who happen to fork (launch aiohttp server in sub-process and then run tests). IMHO I believe the workaround you mentioned is not robust in that one may not control the main run loop (like when running a aiohttp server) and further I worry of the heavy hand of stopping the event loop for complicated applications which have many tasks which are blocked waiting for input, timers, and tasks waiting of various events. In my example I run an aiohttp server that dynamically launches processes from a worker-pool to process tens of thousands of S3 requests. IMO if uvloop crashes that would be a problem in its run-loop impl and should be addressed separately and not affect the BaseEvent loop impl. You also say forking from async from run-loop was never officially supported, but by the same argument it was never officially not-supported either from what I gather from the docs (https://docs.python.org/3/library/asyncio-eventloop.html), and it worked. You say there are many problems with this workflow but there's been a proposed patch for over a year that has sat idle. IMHO I believe this is being punted for "being too hard" without justification given how easily it was worked around in the past. I would imagine the easiest solution is to have forked processes NOT inherit the event loop from the parent...anything else I can see being uber complicated. I realize I have not dug into the innards of the BaseLoop impl but I'm trying to fight for what I think is a very useful workflow that should not be trivially waved as unimportant. The argument given against this to me feels like a reason for people to version lock to minor releases. It changes something from being trivial to use, to very complicated and cumbersome without a very icky hack from sub-processes I had to come up with:
|
FYI another example that just uses asyncio to fork: import asyncio
from concurrent.futures import ProcessPoolExecutor
def sub_proc():
async def doit():
print("hello")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(doit())
async def main():
ppool = ProcessPoolExecutor()
loop = asyncio.get_event_loop()
await loop.run_in_executor(ppool, sub_proc)
if __name__ == '__main__':
_loop = asyncio.get_event_loop()
_loop.run_until_complete(main()) I think the justification needs to be made if python's base modules are fork safe or not. If asyncio is not fork safe this seems to go against the other python base modules. What work is required to be able to subprocess the BaseEventLoop to make it fork safe? |
Forking from within a running HTTP server coroutine is a very bad idea. It's a blocking operation that might take non-trivial amount of time. You better pre-fork, or have a master process that controls its children processes and creates new ones when needed. You can also use
Event loop can safely be stopped and resumed, no matter how many timers/tasks you have. If you care about timeouts being triggered because you pause the loop to fork -- the same will happen when you use
True (and that segfault will soon be fixed). But there are problems even with forking pure python asyncio programs. For instance, One solution is to use the
You can also use Calling bare gevent, for instance, monkey patches
Strictly speaking any network application (blocking or non-blocking) is not
True, this is something we will hopefully fix very soon. |
thanks for the insight, but I think our streams aren't quite crossing yet :) I never stated the workflows involved directly forking, I gave two examples that this PR broke without using fork directly. Also you can create a sub-process without blocking the parent process by using |
Right. I tried your script from #452 (comment) and indeed what should work doesn't work! Seems like it's a bug after all, I'll open a PR. |
yay =) ya it's the same problem as the first example. |
See PR python#452 for context.
See PR python#452 for context.
As discussed in [1] and [2], this PR makes
asyncio.get_event_loop()
to always return the currently running event loop when it is called from a coroutine.The patch adds two new functions:
asyncio._set_current_loop()
andasyncio._get_current_loop()
. Both functions are thread-specific and are considered as low-level APIs, intended to be used by third-party loop implementations only. Even though they have a leading underscore, they will be documented as public asyncio API.loop.run_forever()
is modified to callasyncio._set_current_loop()
andasyncio._get_current_loop()
.asyncio.get_event_loop()
is modified to check if there is a current loop set by callingasyncio._get_current_loop()
. If there is no current event loop, it returns the result ofget_event_loop_policy().get_event_loop()
(old behaviour).The biggest part of the patch is fixing unit tests: all test classes call
super().setUp()
.test_utils.TestCase.setUp
patchesasyncio._get_current_loop()
to always returnNone
. This change is to make sure that the loop is still being passed explicitly within asyncio.[1] PR: #355
[2] https://groups.google.com/d/msg/python-tulip/yF9C-rFpiKk/tk5oA3GLHAAJ