Skip to content

Commit 191708c

Browse files
GH-66285: fix forking in asyncio (#99769)
Closes #66285
1 parent 024ac54 commit 191708c

File tree

3 files changed

+110
-0
lines changed

3 files changed

+110
-0
lines changed

Lib/asyncio/events.py

+11
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import contextvars
1515
import os
16+
import signal
1617
import socket
1718
import subprocess
1819
import sys
@@ -842,3 +843,13 @@ def set_child_watcher(watcher):
842843
_c_get_running_loop = get_running_loop
843844
_c_get_event_loop = get_event_loop
844845
_c__get_event_loop = _get_event_loop
846+
847+
848+
if hasattr(os, 'fork'):
849+
def on_fork():
850+
# Reset the loop and wakeupfd in the forked child process.
851+
if _event_loop_policy is not None:
852+
_event_loop_policy._local = BaseDefaultEventLoopPolicy._Local()
853+
signal.set_wakeup_fd(-1)
854+
855+
os.register_at_fork(after_in_child=on_fork)

Lib/test/test_asyncio/test_unix_events.py

+98
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import contextlib
44
import errno
55
import io
6+
import multiprocessing
67
import os
78
import pathlib
89
import signal
@@ -15,6 +16,8 @@
1516
import warnings
1617
from test.support import os_helper
1718
from test.support import socket_helper
19+
from test.support import wait_process
20+
from test.support import hashlib_helper
1821

1922
if sys.platform == 'win32':
2023
raise unittest.SkipTest('UNIX only')
@@ -1867,5 +1870,100 @@ async def runner():
18671870
wsock.close()
18681871

18691872

1873+
@unittest.skipUnless(hasattr(os, 'fork'), 'requires os.fork()')
1874+
class TestFork(unittest.IsolatedAsyncioTestCase):
1875+
1876+
async def test_fork_not_share_event_loop(self):
1877+
# The forked process should not share the event loop with the parent
1878+
loop = asyncio.get_running_loop()
1879+
r, w = os.pipe()
1880+
self.addCleanup(os.close, r)
1881+
self.addCleanup(os.close, w)
1882+
pid = os.fork()
1883+
if pid == 0:
1884+
# child
1885+
try:
1886+
loop = asyncio.get_event_loop_policy().get_event_loop()
1887+
os.write(w, str(id(loop)).encode())
1888+
finally:
1889+
os._exit(0)
1890+
else:
1891+
# parent
1892+
child_loop = int(os.read(r, 100).decode())
1893+
self.assertNotEqual(child_loop, id(loop))
1894+
wait_process(pid, exitcode=0)
1895+
1896+
@hashlib_helper.requires_hashdigest('md5')
1897+
def test_fork_signal_handling(self):
1898+
# Sending signal to the forked process should not affect the parent
1899+
# process
1900+
ctx = multiprocessing.get_context('fork')
1901+
manager = ctx.Manager()
1902+
self.addCleanup(manager.shutdown)
1903+
child_started = manager.Event()
1904+
child_handled = manager.Event()
1905+
parent_handled = manager.Event()
1906+
1907+
def child_main():
1908+
signal.signal(signal.SIGTERM, lambda *args: child_handled.set())
1909+
child_started.set()
1910+
time.sleep(1)
1911+
1912+
async def main():
1913+
loop = asyncio.get_running_loop()
1914+
loop.add_signal_handler(signal.SIGTERM, lambda *args: parent_handled.set())
1915+
1916+
process = ctx.Process(target=child_main)
1917+
process.start()
1918+
child_started.wait()
1919+
os.kill(process.pid, signal.SIGTERM)
1920+
process.join()
1921+
1922+
async def func():
1923+
await asyncio.sleep(0.1)
1924+
return 42
1925+
1926+
# Test parent's loop is still functional
1927+
self.assertEqual(await asyncio.create_task(func()), 42)
1928+
1929+
asyncio.run(main())
1930+
1931+
self.assertFalse(parent_handled.is_set())
1932+
self.assertTrue(child_handled.is_set())
1933+
1934+
@hashlib_helper.requires_hashdigest('md5')
1935+
def test_fork_asyncio_run(self):
1936+
ctx = multiprocessing.get_context('fork')
1937+
manager = ctx.Manager()
1938+
self.addCleanup(manager.shutdown)
1939+
result = manager.Value('i', 0)
1940+
1941+
async def child_main():
1942+
await asyncio.sleep(0.1)
1943+
result.value = 42
1944+
1945+
process = ctx.Process(target=lambda: asyncio.run(child_main()))
1946+
process.start()
1947+
process.join()
1948+
1949+
self.assertEqual(result.value, 42)
1950+
1951+
@hashlib_helper.requires_hashdigest('md5')
1952+
def test_fork_asyncio_subprocess(self):
1953+
ctx = multiprocessing.get_context('fork')
1954+
manager = ctx.Manager()
1955+
self.addCleanup(manager.shutdown)
1956+
result = manager.Value('i', 1)
1957+
1958+
async def child_main():
1959+
proc = await asyncio.create_subprocess_exec(sys.executable, '-c', 'pass')
1960+
result.value = await proc.wait()
1961+
1962+
process = ctx.Process(target=lambda: asyncio.run(child_main()))
1963+
process.start()
1964+
process.join()
1965+
1966+
self.assertEqual(result.value, 0)
1967+
18701968
if __name__ == '__main__':
18711969
unittest.main()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix :mod:`asyncio` to not share event loop and signal wakeupfd in forked processes. Patch by Kumar Aditya.

0 commit comments

Comments
 (0)