From 70a35a2fe38dcae7d176ce200d9bf902cc4d7e4b Mon Sep 17 00:00:00 2001 From: Yonatan Bitton Date: Sun, 16 Jul 2023 14:48:29 +0300 Subject: [PATCH 01/11] Updated Resource Tracker to return exit code based on leaking found or not and added tests for that --- Lib/multiprocessing/resource_tracker.py | 8 +++++- Lib/test/_test_multiprocessing.py | 34 +++++++++++++++++++++++++ Lib/test/test_concurrent_futures.py | 29 +++++++++++++++++++++ 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index ea369507297f86..3d29d638782679 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -57,6 +57,7 @@ def __init__(self): self._lock = threading.Lock() self._fd = None self._pid = None + self._exitcode = None def _stop(self): with self._lock: @@ -68,7 +69,7 @@ def _stop(self): os.close(self._fd) self._fd = None - os.waitpid(self._pid, 0) + _, self._exitcode = os.waitpid(self._pid, 0) self._pid = None def getfd(self): @@ -191,6 +192,8 @@ def main(fd): pass cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()} + exit_code = 0 + try: # keep track of registered/unregistered resources with open(fd, 'rb') as f: @@ -221,6 +224,7 @@ def main(fd): for rtype, rtype_cache in cache.items(): if rtype_cache: try: + exit_code = 1 warnings.warn('resource_tracker: There appear to be %d ' 'leaked %s objects to clean up at shutdown' % (len(rtype_cache), rtype)) @@ -237,3 +241,5 @@ def main(fd): warnings.warn('resource_tracker: %r: %s' % (name, e)) finally: pass + + sys.exit(exit_code) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index c1f9487ae80511..624ed376d9a39d 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -5536,6 +5536,40 @@ def test_too_long_name_resource(self): with self.assertRaises(ValueError): resource_tracker.register(too_long_name_resource, rtype) + def _test_resource_tracker_leak_resources(self, context, delete_queue): + from multiprocessing.resource_tracker import _resource_tracker + _resource_tracker.ensure_running() + self.assertTrue(_resource_tracker._check_alive()) + + # Reset exit code value + _resource_tracker._exitcode = None + exit_code_assert = self.assertNotEqual + + mp_context = multiprocessing.get_context(context) + + # Keep it on variable, so it won't be cleared yet + q = mp_context.Queue() + if delete_queue: + del q + exit_code_assert = self.assertEqual + + self.assertIsNone(_resource_tracker._exitcode) + _resource_tracker._stop() + + exit_code_assert(_resource_tracker._exitcode, 0) + + def test_resource_tracker_should_return_0_exit_code_when_no_resources_were_leaked_spawn_ctx(self): + self._test_resource_tracker_leak_resources(context="spawn", delete_queue=True) + + def test_resource_tracker_should_return_non_0_exit_code_when_resources_were_leaked_spawn_ctx(self): + self._test_resource_tracker_leak_resources(context="spawn", delete_queue=False) + + def test_resource_tracker_should_return_0_exit_code_when_no_resources_were_leaked_forkserver_ctx(self): + self._test_resource_tracker_leak_resources(context="forkserver", delete_queue=True) + + def test_resource_tracker_should_return_non_0_exit_code_when_resources_were_leaked_forkserver_ctx(self): + self._test_resource_tracker_leak_resources(context="forkserver", delete_queue=False) + class TestSimpleQueue(unittest.TestCase): diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 39dbe234e765e8..5087c48feb15aa 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -290,6 +290,35 @@ def _assert_logged(self, msg): create_executor_tests(FailingInitializerMixin) +@unittest.skipIf(sys.platform == "win32", "Resource Tracker doesn't run on Windows") +class FailingInitializerResourcesTest(unittest.TestCase): + """ + Source: https://github.com/python/cpython/issues/104090 + """ + + def _test(self, test_class): + runner = unittest.TextTestRunner() + result = runner.run(test_class('test_initializer')) + + self.assertEqual(result.testsRun, 1) + self.assertEqual(result.failures, []) + self.assertEqual(result.errors, []) + + # GH-104090: + # Stop resource tracker manually now, so we can verify there are not leaked resources by checking + # the process exit code + from multiprocessing.resource_tracker import _resource_tracker + _resource_tracker._stop() + + self.assertEqual(_resource_tracker._exitcode, 0) + + def test_spawn(self): + self._test(ProcessPoolSpawnFailingInitializerTest) + + def test_forkserver(self): + self._test(ProcessPoolForkserverFailingInitializerTest) + + class ExecutorShutdownTest: def test_run_after_shutdown(self): self.executor.shutdown() From da7e245cf0070495956a9ff3c3a21036f6c3cc16 Mon Sep 17 00:00:00 2001 From: Yonatan Bitton Date: Sun, 16 Jul 2023 15:03:14 +0300 Subject: [PATCH 02/11] Add NEWS file --- .../2023-07-16-15-02-47.gh-issue-104090.oMjNa9.rst | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 Misc/NEWS.d/next/Core and Builtins/2023-07-16-15-02-47.gh-issue-104090.oMjNa9.rst diff --git a/Misc/NEWS.d/next/Core and Builtins/2023-07-16-15-02-47.gh-issue-104090.oMjNa9.rst b/Misc/NEWS.d/next/Core and Builtins/2023-07-16-15-02-47.gh-issue-104090.oMjNa9.rst new file mode 100644 index 00000000000000..f48202eaf04e44 --- /dev/null +++ b/Misc/NEWS.d/next/Core and Builtins/2023-07-16-15-02-47.gh-issue-104090.oMjNa9.rst @@ -0,0 +1,2 @@ +Updated Resource Tracker to return exit code based on resource leaked found +or not From 1de022d390734727c4a857a8d2ca3ad25e63afe6 Mon Sep 17 00:00:00 2001 From: Yonatan Bitton Date: Mon, 17 Jul 2023 18:48:44 +0300 Subject: [PATCH 03/11] Use subTest for the parametrize the same tests with different values --- Lib/test/_test_multiprocessing.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 624ed376d9a39d..abe03b940cc198 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -5558,17 +5558,14 @@ def _test_resource_tracker_leak_resources(self, context, delete_queue): exit_code_assert(_resource_tracker._exitcode, 0) - def test_resource_tracker_should_return_0_exit_code_when_no_resources_were_leaked_spawn_ctx(self): - self._test_resource_tracker_leak_resources(context="spawn", delete_queue=True) - - def test_resource_tracker_should_return_non_0_exit_code_when_resources_were_leaked_spawn_ctx(self): - self._test_resource_tracker_leak_resources(context="spawn", delete_queue=False) - - def test_resource_tracker_should_return_0_exit_code_when_no_resources_were_leaked_forkserver_ctx(self): - self._test_resource_tracker_leak_resources(context="forkserver", delete_queue=True) - - def test_resource_tracker_should_return_non_0_exit_code_when_resources_were_leaked_forkserver_ctx(self): - self._test_resource_tracker_leak_resources(context="forkserver", delete_queue=False) + def test_resource_tracker_exit_code(self): + for context in ["spawn", "forkserver"]: + for delete_queue in [True, False]: + with self.subTest(context=context, delete_queue=delete_queue): + self._test_resource_tracker_leak_resources( + context=context, + delete_queue=delete_queue, + ) class TestSimpleQueue(unittest.TestCase): From 31bc4afae5994ee17d705f399523a52f160a7233 Mon Sep 17 00:00:00 2001 From: Yonatan Bitton Date: Tue, 18 Jul 2023 12:03:19 +0300 Subject: [PATCH 04/11] Updated tests based on PR comments, and fix the resource_tracker code to convert from status code to exit code --- Lib/multiprocessing/resource_tracker.py | 9 ++++++++- Lib/test/_test_multiprocessing.py | 6 ++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 3d29d638782679..c75ea6f8af0333 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -69,9 +69,16 @@ def _stop(self): os.close(self._fd) self._fd = None - _, self._exitcode = os.waitpid(self._pid, 0) + _, status = os.waitpid(self._pid, 0) + self._pid = None + try: + self._exitcode = os.waitstatus_to_exitcode(status) + except ValueError: + # os.waitstatus_to_exitcode may raise an exception for invalid values + self._exitcode = None + def getfd(self): self.ensure_running() return self._fd diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index abe03b940cc198..e7238b28fddd17 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -5551,12 +5551,14 @@ def _test_resource_tracker_leak_resources(self, context, delete_queue): q = mp_context.Queue() if delete_queue: del q - exit_code_assert = self.assertEqual self.assertIsNone(_resource_tracker._exitcode) _resource_tracker._stop() - exit_code_assert(_resource_tracker._exitcode, 0) + if delete_queue: + self.assertEqual(_resource_tracker._exitcode, 0) + else: + self.assertEqual(_resource_tracker._exitcode, 1) def test_resource_tracker_exit_code(self): for context in ["spawn", "forkserver"]: From 47551a3014f98c905d9e587842fde27767b1b0f5 Mon Sep 17 00:00:00 2001 From: Yonatan Bitton Date: Tue, 18 Jul 2023 12:04:59 +0300 Subject: [PATCH 05/11] Clean test --- Lib/test/_test_multiprocessing.py | 1 - 1 file changed, 1 deletion(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index e7238b28fddd17..ba6fd4144fe676 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -5543,7 +5543,6 @@ def _test_resource_tracker_leak_resources(self, context, delete_queue): # Reset exit code value _resource_tracker._exitcode = None - exit_code_assert = self.assertNotEqual mp_context = multiprocessing.get_context(context) From a02fdbe486c2038f0d838296f4d973d2024313fd Mon Sep 17 00:00:00 2001 From: Yonatan Bitton Date: Tue, 18 Jul 2023 12:18:40 +0300 Subject: [PATCH 06/11] Added comment for test and simplified the logic --- Lib/test/_test_multiprocessing.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index ba6fd4144fe676..d0cee161251197 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -5550,16 +5550,20 @@ def _test_resource_tracker_leak_resources(self, context, delete_queue): q = mp_context.Queue() if delete_queue: del q + expected_exit_code = 0 + else: + expected_exit_code = 1 self.assertIsNone(_resource_tracker._exitcode) _resource_tracker._stop() - if delete_queue: - self.assertEqual(_resource_tracker._exitcode, 0) - else: - self.assertEqual(_resource_tracker._exitcode, 1) + self.assertEqual(_resource_tracker._exitcode, expected_exit_code) def test_resource_tracker_exit_code(self): + """ + Test the exit code of the resource tracker based on if there were left leaked resources when we stop the process. + If not leaked resources were found, exit code should be 0, otherwise 1 + """ for context in ["spawn", "forkserver"]: for delete_queue in [True, False]: with self.subTest(context=context, delete_queue=delete_queue): From 4dbb777451a2d32542995769f61a94f09a8edf0e Mon Sep 17 00:00:00 2001 From: Yonatan Bitton Date: Tue, 18 Jul 2023 13:38:29 +0300 Subject: [PATCH 07/11] Fix tests and News based on PR comments --- Lib/test/_test_multiprocessing.py | 5 +++-- Lib/test/test_concurrent_futures.py | 6 +----- .../2023-07-16-15-02-47.gh-issue-104090.oMjNa9.rst | 4 ++-- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index d0cee161251197..3a7ece39226bc9 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -5561,8 +5561,9 @@ def _test_resource_tracker_leak_resources(self, context, delete_queue): def test_resource_tracker_exit_code(self): """ - Test the exit code of the resource tracker based on if there were left leaked resources when we stop the process. - If not leaked resources were found, exit code should be 0, otherwise 1 + Test the exit code of the resource tracker based on if there were left + leaked resources when we stop the process. If not leaked resources were + found, exit code should be 0, otherwise 1 """ for context in ["spawn", "forkserver"]: for delete_queue in [True, False]: diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 5087c48feb15aa..e50260773793a5 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -298,11 +298,7 @@ class FailingInitializerResourcesTest(unittest.TestCase): def _test(self, test_class): runner = unittest.TextTestRunner() - result = runner.run(test_class('test_initializer')) - - self.assertEqual(result.testsRun, 1) - self.assertEqual(result.failures, []) - self.assertEqual(result.errors, []) + runner.run(test_class('test_initializer')) # GH-104090: # Stop resource tracker manually now, so we can verify there are not leaked resources by checking diff --git a/Misc/NEWS.d/next/Core and Builtins/2023-07-16-15-02-47.gh-issue-104090.oMjNa9.rst b/Misc/NEWS.d/next/Core and Builtins/2023-07-16-15-02-47.gh-issue-104090.oMjNa9.rst index f48202eaf04e44..202026586dce57 100644 --- a/Misc/NEWS.d/next/Core and Builtins/2023-07-16-15-02-47.gh-issue-104090.oMjNa9.rst +++ b/Misc/NEWS.d/next/Core and Builtins/2023-07-16-15-02-47.gh-issue-104090.oMjNa9.rst @@ -1,2 +1,2 @@ -Updated Resource Tracker to return exit code based on resource leaked found -or not +The multiprocessing resource tracker now exits with status code 1 if a resource +leak was detected. It still exits with status code 0 otherwise. From bfd1e8ee65010040115d35611d01975596353fef Mon Sep 17 00:00:00 2001 From: Yonatan Bitton Date: Tue, 18 Jul 2023 13:42:36 +0300 Subject: [PATCH 08/11] Added reseting the exit code when the process should run again --- Lib/multiprocessing/resource_tracker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index c75ea6f8af0333..3d3bc99fbce72a 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -108,6 +108,7 @@ def ensure_running(self): pass self._fd = None self._pid = None + self._exitcode = None warnings.warn('resource_tracker: process died unexpectedly, ' 'relaunching. Some resources might leak.') From 043ae90b58f6f35ef36c0178849f2a0ccd25dd5b Mon Sep 17 00:00:00 2001 From: Yonatan Bitton Date: Tue, 18 Jul 2023 13:48:55 +0300 Subject: [PATCH 09/11] Added more clearing queue methods beside del (closing and calling gc.collect --- Lib/test/_test_multiprocessing.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 3a7ece39226bc9..2a6f0310f7a933 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -5549,7 +5549,11 @@ def _test_resource_tracker_leak_resources(self, context, delete_queue): # Keep it on variable, so it won't be cleared yet q = mp_context.Queue() if delete_queue: + # Clearing the queue resource to be sure explicitly with deleting + # and gc.collect + q.close() del q + gc.collect() expected_exit_code = 0 else: expected_exit_code = 1 From e258cb3a99f428fe4575a7b66caaa498c35c44a5 Mon Sep 17 00:00:00 2001 From: Yonatan Bitton Date: Thu, 31 Aug 2023 20:36:42 +0300 Subject: [PATCH 10/11] Moved FailingInitializerResourcesTest class from old path (Lib/test/test_concurrent_futures.py) to new location (Lib/test/test_concurrent_futures/test_init.py) --- Lib/test/test_concurrent_futures/test_init.py | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/Lib/test/test_concurrent_futures/test_init.py b/Lib/test/test_concurrent_futures/test_init.py index ce01e0ff0f287a..567d4fd988b4d7 100644 --- a/Lib/test/test_concurrent_futures/test_init.py +++ b/Lib/test/test_concurrent_futures/test_init.py @@ -109,6 +109,31 @@ def _assert_logged(self, msg): create_executor_tests(globals(), FailingInitializerMixin) +@unittest.skipIf(sys.platform == "win32", "Resource Tracker doesn't run on Windows") +class FailingInitializerResourcesTest(unittest.TestCase): + """ + Source: https://github.com/python/cpython/issues/104090 + """ + + def _test(self, test_class): + runner = unittest.TextTestRunner() + runner.run(test_class('test_initializer')) + + # GH-104090: + # Stop resource tracker manually now, so we can verify there are not leaked resources by checking + # the process exit code + from multiprocessing.resource_tracker import _resource_tracker + _resource_tracker._stop() + + self.assertEqual(_resource_tracker._exitcode, 0) + + def test_spawn(self): + self._test(ProcessPoolSpawnFailingInitializerTest) + + def test_forkserver(self): + self._test(ProcessPoolForkserverFailingInitializerTest) + + def setUpModule(): setup_module() From 915c8a397158f5fe397459334a1803fdf9166f6b Mon Sep 17 00:00:00 2001 From: Yonatan Bitton Date: Thu, 31 Aug 2023 20:46:05 +0300 Subject: [PATCH 11/11] Added missing import --- Lib/test/test_concurrent_futures/test_init.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Lib/test/test_concurrent_futures/test_init.py b/Lib/test/test_concurrent_futures/test_init.py index 567d4fd988b4d7..d79a6367701fb4 100644 --- a/Lib/test/test_concurrent_futures/test_init.py +++ b/Lib/test/test_concurrent_futures/test_init.py @@ -3,6 +3,7 @@ import queue import time import unittest +import sys from concurrent.futures._base import BrokenExecutor from logging.handlers import QueueHandler