Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-22087: Fix Policy.get_event_loop() to detect fork #7208

Merged
merged 2 commits into from
May 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,16 +625,23 @@ class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):

class _Local(threading.local):
_loop = None
_pid = None
_set_called = False

def __init__(self):
self._local = self._Local()
self._local._pid = os.getpid()

def get_event_loop(self):
"""Get the event loop.

This may be None or an instance of EventLoop.
"""
if self._local._pid != os.getpid():
# If we detect we're in a child process forked by multiprocessing,
# we reset self._local so that we'll get a new event loop.
self._local = self._Local()

if (self._local._loop is None and
not self._local._set_called and
isinstance(threading.current_thread(), threading._MainThread)):
Expand Down
32 changes: 32 additions & 0 deletions Lib/test/test_asyncio/test_unix_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import tempfile
import threading
import unittest
import multiprocessing
from unittest import mock
from test import support

Expand Down Expand Up @@ -1804,6 +1805,37 @@ def create_watcher(self):
return asyncio.FastChildWatcher()


class ForkedProcessTests(unittest.TestCase):
def setUp(self):
self.parent_loop = asyncio.SelectorEventLoop()
asyncio.set_event_loop(self.parent_loop)
self.ctx = multiprocessing.get_context("fork")

def tearDown(self):
self.parent_loop.close()

def _check_loops_not_equal(self, old_loop):
loop = asyncio.get_event_loop()
if loop is old_loop:
raise RuntimeError("Child process inherited parent's event loop")

try:
val = loop.run_until_complete(asyncio.sleep(0.05, result=42))
if val != 42:
raise RuntimeError("new event loop does not work")
finally:
loop.close()

sys.exit(loop is old_loop)

def test_new_loop_in_child(self):
p = self.ctx.Process(target=self._check_loops_not_equal,
args=(self.parent_loop,))
p.start()
p.join()
self.assertEqual(p.exitcode, 0)


class PolicyTests(unittest.TestCase):

def create_policy(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fix Policy.get_event_loop() to detect fork and return a new loop.

Original patch by Dan O'Reilly.