Skip to content

Commit 0c1fbc1

Browse files
GH-66285: fix forking in asyncio (#99539)
`asyncio` now does not shares event loop and signal wakeupfd in forked processes.
1 parent 9dc0836 commit 0c1fbc1

File tree

3 files changed

+105
-0
lines changed

3 files changed

+105
-0
lines changed

Lib/asyncio/events.py

+9
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import subprocess
1818
import sys
1919
import threading
20+
import signal
2021

2122
from . import format_helpers
2223

@@ -665,6 +666,14 @@ class _Local(threading.local):
665666

666667
def __init__(self):
667668
self._local = self._Local()
669+
if hasattr(os, 'fork'):
670+
def on_fork():
671+
# Reset the loop and wakeupfd in the forked child process.
672+
self._local = self._Local()
673+
signal.set_wakeup_fd(-1)
674+
675+
os.register_at_fork(after_in_child=on_fork)
676+
668677

669678
def get_event_loop(self):
670679
"""Get the event loop for the current context.

Lib/test/test_asyncio/test_unix_events.py

+95
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@
1111
import sys
1212
import threading
1313
import unittest
14+
import time
1415
from unittest import mock
1516
import warnings
17+
import multiprocessing
1618
from test.support import os_helper
1719
from test.support import socket_helper
20+
from test.support import wait_process
1821

1922
if sys.platform == 'win32':
2023
raise unittest.SkipTest('UNIX only')
@@ -1867,5 +1870,97 @@ async def runner():
18671870
wsock.close()
18681871

18691872

1873+
@unittest.skipUnless(hasattr(os, 'fork'), 'requires os.fork()')
1874+
class TestFork(unittest.IsolatedAsyncioTestCase):
1875+
1876+
async def test_fork_not_share_event_loop(self):
1877+
# The forked process should not share the event loop with the parent
1878+
loop = asyncio.get_running_loop()
1879+
r, w = os.pipe()
1880+
self.addCleanup(os.close, r)
1881+
self.addCleanup(os.close, w)
1882+
pid = os.fork()
1883+
if pid == 0:
1884+
# child
1885+
try:
1886+
loop = asyncio.get_event_loop_policy().get_event_loop()
1887+
os.write(w, str(id(loop)).encode())
1888+
finally:
1889+
os._exit(0)
1890+
else:
1891+
# parent
1892+
child_loop = int(os.read(r, 100).decode())
1893+
self.assertNotEqual(child_loop, id(loop))
1894+
wait_process(pid, exitcode=0)
1895+
1896+
def test_fork_signal_handling(self):
1897+
# Sending signal to the forked process should not affect the parent
1898+
# process
1899+
ctx = multiprocessing.get_context('fork')
1900+
manager = ctx.Manager()
1901+
self.addCleanup(manager.shutdown)
1902+
child_started = manager.Event()
1903+
child_handled = manager.Event()
1904+
parent_handled = manager.Event()
1905+
1906+
def child_main():
1907+
signal.signal(signal.SIGTERM, lambda *args: child_handled.set())
1908+
child_started.set()
1909+
time.sleep(1)
1910+
1911+
async def main():
1912+
loop = asyncio.get_running_loop()
1913+
loop.add_signal_handler(signal.SIGTERM, lambda *args: parent_handled.set())
1914+
1915+
process = ctx.Process(target=child_main)
1916+
process.start()
1917+
child_started.wait()
1918+
os.kill(process.pid, signal.SIGTERM)
1919+
process.join()
1920+
1921+
async def func():
1922+
await asyncio.sleep(0.1)
1923+
return 42
1924+
1925+
# Test parent's loop is still functional
1926+
self.assertEqual(await asyncio.create_task(func()), 42)
1927+
1928+
asyncio.run(main())
1929+
1930+
self.assertFalse(parent_handled.is_set())
1931+
self.assertTrue(child_handled.is_set())
1932+
1933+
def test_fork_asyncio_run(self):
1934+
ctx = multiprocessing.get_context('fork')
1935+
manager = ctx.Manager()
1936+
self.addCleanup(manager.shutdown)
1937+
result = manager.Value('i', 0)
1938+
1939+
async def child_main():
1940+
await asyncio.sleep(0.1)
1941+
result.value = 42
1942+
1943+
process = ctx.Process(target=lambda: asyncio.run(child_main()))
1944+
process.start()
1945+
process.join()
1946+
1947+
self.assertEqual(result.value, 42)
1948+
1949+
def test_fork_asyncio_subprocess(self):
1950+
ctx = multiprocessing.get_context('fork')
1951+
manager = ctx.Manager()
1952+
self.addCleanup(manager.shutdown)
1953+
result = manager.Value('i', 1)
1954+
1955+
async def child_main():
1956+
proc = await asyncio.create_subprocess_exec(sys.executable, '-c', 'pass')
1957+
result.value = await proc.wait()
1958+
1959+
process = ctx.Process(target=lambda: asyncio.run(child_main()))
1960+
process.start()
1961+
process.join()
1962+
1963+
self.assertEqual(result.value, 0)
1964+
18701965
if __name__ == '__main__':
18711966
unittest.main()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix :mod:`asyncio` to not share event loop and signal wakeupfd in forked processes. Patch by Kumar Aditya.

0 commit comments

Comments
 (0)