From 4074b594d60c09b8f7d82425e5bd9f0f95431e62 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 13 Oct 2023 23:21:15 +0200 Subject: [PATCH 01/19] GH-110829: Ensure Thread.join() joins the OS thread --- Include/pythread.h | 8 ++ Lib/test/_test_multiprocessing.py | 19 ++-- Lib/test/test_thread.py | 99 +++++++++++++++++ Lib/threading.py | 35 +++++- Modules/_threadmodule.c | 175 ++++++++++++++++++++++++++++-- Python/thread_nt.h | 32 ++++-- Python/thread_pthread.h | 47 +++++++- 7 files changed, 381 insertions(+), 34 deletions(-) diff --git a/Include/pythread.h b/Include/pythread.h index 0784f6b2e5391f..50da70681fbb33 100644 --- a/Include/pythread.h +++ b/Include/pythread.h @@ -20,6 +20,14 @@ PyAPI_FUNC(unsigned long) PyThread_start_new_thread(void (*)(void *), void *); PyAPI_FUNC(void) _Py_NO_RETURN PyThread_exit_thread(void); PyAPI_FUNC(unsigned long) PyThread_get_thread_ident(void); +#if !defined(Py_LIMITED_API) +PyAPI_FUNC(unsigned long) PyThread_start_joinable_thread(void (*func)(void *), + void *arg, + Py_uintptr_t* handle); +PyAPI_FUNC(int) PyThread_join_thread(Py_uintptr_t); +PyAPI_FUNC(int) PyThread_detach_thread(Py_uintptr_t); +#endif + #if (defined(__APPLE__) || defined(__linux__) || defined(_WIN32) \ || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) \ || defined(__DragonFly__) || defined(_AIX)) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index bf87a3e8d6ffd8..679f354b1f721d 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2581,18 +2581,10 @@ def test_async(self): self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) def test_async_timeout(self): - p = self.Pool(3) - try: - event = threading.Event() if self.TYPE == 'threads' else None - res = p.apply_async(sqr, (6, TIMEOUT2 + support.SHORT_TIMEOUT, event)) - get = TimingWrapper(res.get) - self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) - self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) - finally: - if event is not None: - event.set() - p.terminate() - p.join() + res = self.pool.apply_async(sqr, (6, 5 * TIMEOUT2)) + get = TimingWrapper(res.get) + self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) + self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) def test_imap(self): it = self.pool.imap(sqr, list(range(10))) @@ -2693,6 +2685,9 @@ def test_make_pool(self): p.join() def test_terminate(self): + if self.TYPE == 'threads': + self.skipTest("Threads cannot be terminated") + # Simulate slow tasks which take "forever" to complete p = self.Pool(3) args = [support.LONG_TIMEOUT for i in range(10_000)] diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py index 831aaf5b6a794f..164880d65371a5 100644 --- a/Lib/test/test_thread.py +++ b/Lib/test/test_thread.py @@ -160,6 +160,105 @@ def task(): f"Exception ignored in thread started by {task!r}") self.assertIsNotNone(cm.unraisable.exc_traceback) + def test_join_thread(self): + finished = [] + + def task(): + time.sleep(0.05) + finished.append(None) + + with threading_helper.wait_threads_exit(): + joinable = True + ident = thread.start_new_thread(task, (), {}, joinable) + thread.join_thread(ident) + self.assertEqual(len(finished), 1) + + def test_join_thread_already_exited(self): + def task(): + pass + + with threading_helper.wait_threads_exit(): + joinable = True + ident = thread.start_new_thread(task, (), {}, joinable) + time.sleep(0.05) + thread.join_thread(ident) + + def test_join_non_joinable(self): + def task(): + pass + + with threading_helper.wait_threads_exit(): + ident = thread.start_new_thread(task, ()) + with self.assertRaisesRegex(ValueError, "not joinable"): + thread.join_thread(ident) + + def test_join_several_times(self): + def task(): + pass + + with threading_helper.wait_threads_exit(): + joinable = True + ident = thread.start_new_thread(task, (), {}, joinable) + thread.join_thread(ident) + with self.assertRaisesRegex(ValueError, "not joinable"): + thread.join_thread(ident) + + def test_join_from_self(self): + errors = [] + lock = thread.allocate_lock() + lock.acquire() + + def task(): + ident = thread.get_ident() + # Wait for start_new_thread() to return so that the joinable threads + # are populated with the ident, otherwise ValueError would be raised + # instead. + lock.acquire() + try: + thread.join_thread(ident) + except Exception as e: + errors.append(e) + + with threading_helper.wait_threads_exit(): + joinable = True + ident = thread.start_new_thread(task, (), {}, joinable) + lock.release() + time.sleep(0.05) + # Can still join after join_thread() failed in other thread + thread.join_thread(ident) + + assert len(errors) == 1 + with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"): + raise errors[0] + + def test_detach_then_join(self): + lock = thread.allocate_lock() + lock.acquire() + + def task(): + lock.acquire() + + with threading_helper.wait_threads_exit(): + joinable = True + ident = thread.start_new_thread(task, (), {}, joinable) + # detach_thread() returns even though the thread is blocked on lock + thread.detach_thread(ident) + # join_thread() then cannot be called anymore + with self.assertRaisesRegex(ValueError, "not joinable"): + thread.join_thread(ident) + lock.release() + + def test_join_then_detach(self): + def task(): + pass + + with threading_helper.wait_threads_exit(): + joinable = True + ident = thread.start_new_thread(task, (), {}, joinable) + thread.join_thread(ident) + with self.assertRaisesRegex(ValueError, "not joinable"): + thread.detach_thread(ident) + class Barrier: def __init__(self, num_threads): diff --git a/Lib/threading.py b/Lib/threading.py index 41c3a9ff93856f..8c23e8f313555a 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -5,6 +5,7 @@ import _thread import functools import warnings +import _weakref from time import monotonic as _time from _weakrefset import WeakSet @@ -34,6 +35,8 @@ # Rename some stuff so "from threading import *" is safe _start_new_thread = _thread.start_new_thread +_join_thread = _thread.join_thread +_detach_thread = _thread.detach_thread _daemon_threads_allowed = _thread.daemon_threads_allowed _allocate_lock = _thread.allocate_lock _set_sentinel = _thread._set_sentinel @@ -924,6 +927,7 @@ class is implemented. if _HAVE_THREAD_NATIVE_ID: self._native_id = None self._tstate_lock = None + self._join_lock = None self._started = Event() self._is_stopped = False self._initialized = True @@ -944,11 +948,14 @@ def _reset_internal_locks(self, is_alive): if self._tstate_lock is not None: self._tstate_lock._at_fork_reinit() self._tstate_lock.acquire() + if self._join_lock is not None: + self._join_lock._at_fork_reinit() else: # The thread isn't alive after fork: it doesn't have a tstate # anymore. self._is_stopped = True self._tstate_lock = None + self._join_lock = None def __repr__(self): assert self._initialized, "Thread.__init__() was not called" @@ -980,15 +987,24 @@ def start(self): if self._started.is_set(): raise RuntimeError("threads can only be started once") + self._join_lock = _allocate_lock() + with _active_limbo_lock: _limbo[self] = self try: - _start_new_thread(self._bootstrap, ()) + # Start joinable thread + _start_new_thread(self._bootstrap, (), {}, True) except Exception: with _active_limbo_lock: del _limbo[self] raise - self._started.wait() + self._started.wait() # Will set ident and native_id + + # We need to make sure the OS thread is either explicitly joined or + # detached at some point, otherwise system resources can be leaked. + def _finalizer(wr, _detach_thread=_detach_thread, ident=self._ident): + _detach_thread(ident) + self._non_joined_finalizer = _weakref.ref(self, _finalizer) def run(self): """Method representing the thread's activity. @@ -1144,6 +1160,19 @@ def join(self, timeout=None): # historically .join(timeout=x) for x<0 has acted as if timeout=0 self._wait_for_tstate_lock(timeout=max(timeout, 0)) + if self._is_stopped: + self._join_os_thread() + + def _join_os_thread(self): + join_lock = self._join_lock + if join_lock is not None: + # Calling join() multiple times simultaneously would result in early + # return for one of the callers. + with join_lock: + _join_thread(self._ident) + self._join_lock = None + self._non_joined_finalizer = None + def _wait_for_tstate_lock(self, block=True, timeout=-1): # Issue #18808: wait for the thread state to be gone. # At the end of the thread's life, after all knowledge of the thread @@ -1223,6 +1252,8 @@ def is_alive(self): if self._is_stopped or not self._started.is_set(): return False self._wait_for_tstate_lock(False) + if self._is_stopped: + self._join_os_thread() return not self._is_stopped @property diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index 9eecebddb723a4..4e6be73204f5d7 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -28,6 +28,7 @@ typedef struct { PyTypeObject *lock_type; PyTypeObject *local_type; PyTypeObject *local_dummy_type; + PyObject *joinable_dict; } thread_module_state; static inline thread_module_state* @@ -38,6 +39,25 @@ get_thread_state(PyObject *module) return (thread_module_state *)state; } +static int get_joinable_thread(thread_module_state* state, PyObject* ident_obj, + unsigned long *ident, Py_uintptr_t *handle) { + PyObject* handle_obj = PyDict_GetItemWithError(state->joinable_dict, ident_obj); + if (handle_obj == NULL) { + if (!PyErr_Occurred()) { + PyErr_SetString(PyExc_ValueError, "the given thread is not joinable"); + } + return -1; + } + *ident = PyLong_AsUnsignedLong(ident_obj); + unsigned long long ull_handle = PyLong_AsUnsignedLongLong(handle_obj); + if ((*ident == (unsigned long) -1 || ull_handle == (unsigned long long) -1) + && PyErr_Occurred()) { + // This should not occur as we control the contents of state->joinable_dict + return -1; + } + *handle = (Py_uintptr_t) ull_handle; + return 0; +} /* Lock objects */ @@ -1110,12 +1130,13 @@ Return True if daemon threads are allowed in the current interpreter,\n\ and False otherwise.\n"); static PyObject * -thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) +thread_PyThread_start_new_thread(PyObject *module, PyObject *fargs) { - PyObject *func, *args, *kwargs = NULL; + PyObject *func, *args, *kwargs = NULL, *joinable = NULL; + thread_module_state *state = get_thread_state(module); - if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3, - &func, &args, &kwargs)) + if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 4, + &func, &args, &kwargs, &joinable)) return NULL; if (!PyCallable_Check(func)) { PyErr_SetString(PyExc_TypeError, @@ -1132,6 +1153,11 @@ thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) "optional 3rd arg must be a dictionary"); return NULL; } + if (joinable != NULL && !PyBool_Check(joinable)) { + PyErr_SetString(PyExc_TypeError, + "optional 4th arg must be a boolean"); + return NULL; + } if (PySys_Audit("_thread.start_new_thread", "OOO", func, args, kwargs ? kwargs : Py_None) < 0) { @@ -1169,18 +1195,41 @@ thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) boot->args = Py_NewRef(args); boot->kwargs = Py_XNewRef(kwargs); - unsigned long ident = PyThread_start_new_thread(thread_run, (void*) boot); + unsigned long ident; + Py_uintptr_t handle = 0; + if (joinable == Py_True) { + ident = PyThread_start_joinable_thread(thread_run, (void*) boot, &handle); + } else { + ident = PyThread_start_new_thread(thread_run, (void*) boot); + } if (ident == PYTHREAD_INVALID_THREAD_ID) { PyErr_SetString(ThreadError, "can't start new thread"); PyThreadState_Clear(boot->tstate); thread_bootstate_free(boot, 1); return NULL; } - return PyLong_FromUnsignedLong(ident); + PyObject* ident_obj = PyLong_FromUnsignedLong(ident); + if (ident_obj == NULL) { + return NULL; + } + if (joinable == Py_True) { + PyObject* handle_obj = PyLong_FromUnsignedLongLong(handle); + if (handle_obj == NULL) { + Py_DECREF(ident_obj); + return NULL; + } + if (PyDict_SetItem(state->joinable_dict, ident_obj, handle_obj)) { + Py_DECREF(handle_obj); + Py_DECREF(ident_obj); + return NULL; + } + Py_DECREF(handle_obj); + } + return ident_obj; } PyDoc_STRVAR(start_new_doc, -"start_new_thread(function, args[, kwargs])\n\ +"start_new_thread(function, args[, kwargs[, joinable]])\n\ (start_new() is an obsolete synonym)\n\ \n\ Start a new thread and return its identifier. The thread will call the\n\ @@ -1204,6 +1253,89 @@ PyDoc_STRVAR(exit_doc, This is synonymous to ``raise SystemExit''. It will cause the current\n\ thread to exit silently unless the exception is caught."); +static PyObject * +thread_PyThread_join_thread(PyObject *module, PyObject *ident_obj) +{ + thread_module_state *state = get_thread_state(module); + + if (!PyLong_Check(ident_obj)) { + PyErr_Format(PyExc_TypeError, "thread ident must be an int, not %s", + Py_TYPE(ident_obj)->tp_name); + return NULL; + } + // Check if the ident is part of the joinable threads and fetch its handle. + unsigned long ident; + Py_uintptr_t handle; + if (get_joinable_thread(state, ident_obj, &ident, &handle)) { + return NULL; + } + if (ident == PyThread_get_thread_ident()) { + // PyThread_join_thread() would deadlock or error out. + PyErr_SetString(ThreadError, "Cannot join current thread"); + return NULL; + } + // Before actually joining, we must first remove the ident from joinable_dict, + // as joining several times simultaneously or sequentially is undefined behavior. + if (PyDict_DelItem(state->joinable_dict, ident_obj)) { + return NULL; + } + int ret; + Py_BEGIN_ALLOW_THREADS + ret = PyThread_join_thread((Py_uintptr_t) handle); + Py_END_ALLOW_THREADS + if (ret) { + PyErr_SetString(ThreadError, "Failed joining thread"); + return NULL; + } + Py_RETURN_NONE; +} + +PyDoc_STRVAR(join_thread_doc, +"join_thread(ident)\n\ +\n\ +Join the thread with the given identifier. The thread must have been started\n\ +with start_new_thread() with the `joinable` argument set to True.\n\ +This function can only be called once per joinable thread, and it cannot be\n\ +called if detach_thread() was previously called on the same thread.\n"); + +static PyObject * +thread_PyThread_detach_thread(PyObject *module, PyObject *ident_obj) +{ + thread_module_state *state = get_thread_state(module); + + if (!PyLong_Check(ident_obj)) { + PyErr_Format(PyExc_TypeError, "thread ident must be an int, not %s", + Py_TYPE(ident_obj)->tp_name); + return NULL; + } + unsigned long ident; + Py_uintptr_t handle; + if (get_joinable_thread(state, ident_obj, &ident, &handle)) { + return NULL; + } + if (PyDict_DelItem(state->joinable_dict, ident_obj)) { + if (PyErr_ExceptionMatches(PyExc_KeyError)) { + PyErr_SetString(PyExc_ValueError, + "the given thread is not joinable and thus cannot be detached"); + } + return NULL; + } + int ret = PyThread_detach_thread(handle); + if (ret) { + PyErr_SetString(ThreadError, "Failed detaching thread"); + return NULL; + } + Py_RETURN_NONE; +} + +PyDoc_STRVAR(detach_thread_doc, +"detach_thread(ident)\n\ +\n\ +Detach the thread with the given identifier. The thread must have been started\n\ +with start_new_thread() with the `joinable` argument set to True.\n\ +This function can only be called once per joinable thread, and it cannot be\n\ +called if join_thread() was previously called on the same thread.\n"); + static PyObject * thread_PyThread_interrupt_main(PyObject *self, PyObject *args) { @@ -1574,6 +1706,10 @@ static PyMethodDef thread_methods[] = { METH_VARARGS, start_new_doc}, {"start_new", (PyCFunction)thread_PyThread_start_new_thread, METH_VARARGS, start_new_doc}, + {"join_thread", (PyCFunction)thread_PyThread_join_thread, + METH_O, join_thread_doc}, + {"detach_thread", (PyCFunction)thread_PyThread_detach_thread, + METH_O, detach_thread_doc}, {"daemon_threads_allowed", (PyCFunction)thread_daemon_threads_allowed, METH_NOARGS, daemon_threads_allowed_doc}, {"allocate_lock", thread_PyThread_allocate_lock, @@ -1652,6 +1788,12 @@ thread_module_exec(PyObject *module) return -1; } + // Dict of joinable threads: ident -> handle + state->joinable_dict = PyDict_New(); + if (state->joinable_dict == NULL) { + return -1; + } + // Add module attributes if (PyDict_SetItemString(d, "error", ThreadError) < 0) { return -1; @@ -1690,6 +1832,7 @@ thread_module_traverse(PyObject *module, visitproc visit, void *arg) Py_VISIT(state->lock_type); Py_VISIT(state->local_type); Py_VISIT(state->local_dummy_type); + Py_VISIT(state->joinable_dict); return 0; } @@ -1701,6 +1844,24 @@ thread_module_clear(PyObject *module) Py_CLEAR(state->lock_type); Py_CLEAR(state->local_type); Py_CLEAR(state->local_dummy_type); + // To avoid resource leaks, detach all still joinable threads + if (state->joinable_dict != NULL) { + PyObject *key, *value; + Py_ssize_t pos = 0; + + while (PyDict_Next(state->joinable_dict, &pos, &key, &value)) { + if (PyLong_Check(value)) { + unsigned long long handle = PyLong_AsUnsignedLongLong(value); + if (handle == (unsigned long long) -1 && PyErr_Occurred()) { + // Should not happen + PyErr_Clear(); + } else { + PyThread_detach_thread((Py_uintptr_t) handle); + } + } + } + Py_CLEAR(state->joinable_dict); + } return 0; } diff --git a/Python/thread_nt.h b/Python/thread_nt.h index 26f441bd6d3c56..8dbe78e42e3edf 100644 --- a/Python/thread_nt.h +++ b/Python/thread_nt.h @@ -183,8 +183,7 @@ bootstrap(void *call) } unsigned long -PyThread_start_new_thread(void (*func)(void *), void *arg) -{ +PyThread_start_joinable_thread(void (*func)(void *), void *arg, Py_uintptr_t* handle) { HANDLE hThread; unsigned threadID; callobj *obj; @@ -207,16 +206,35 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) /* I've seen errno == EAGAIN here, which means "there are * too many threads". */ - int e = errno; - threadID = (unsigned)-1; HeapFree(GetProcessHeap(), 0, obj); + return PYTHREAD_INVALID_THREAD_ID; } - else { - CloseHandle(hThread); - } + *handle = (Py_uintptr_t) hThread; + return threadID; +} + +unsigned long +PyThread_start_new_thread(void (*func)(void *), void *arg) { + Py_uintptr_t handle; + unsigned long threadID = PyThread_start_joinable_thread(func, arg, &handle); + CloseHandle((HANDLE) handle); return threadID; } +int +PyThread_join_thread(Py_uintptr_t handle) { + HANDLE hThread = (HANDLE) handle; + int errored = (WaitForSingleObject(hThread, INFINITE) != WAIT_OBJECT_0); + CloseHandle(hThread); + return errored; +} + +int +PyThread_detach_thread(Py_uintptr_t handle) { + HANDLE hThread = (HANDLE) handle; + return (CloseHandle(hThread) == 0); +} + /* * Return the thread Id instead of a handle. The Id is said to uniquely identify the * thread in the system diff --git a/Python/thread_pthread.h b/Python/thread_pthread.h index 76a1f7763f23b9..5d08429c36dedc 100644 --- a/Python/thread_pthread.h +++ b/Python/thread_pthread.h @@ -235,8 +235,8 @@ pythread_wrapper(void *arg) return NULL; } -unsigned long -PyThread_start_new_thread(void (*func)(void *), void *arg) +static int +do_start_joinable_thread(void (*func)(void *), void *arg, pthread_t* out_id) { pthread_t th; int status; @@ -252,7 +252,7 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) #if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED) if (pthread_attr_init(&attrs) != 0) - return PYTHREAD_INVALID_THREAD_ID; + return -1; #endif #if defined(THREAD_STACK_SIZE) PyThreadState *tstate = _PyThreadState_GET(); @@ -261,7 +261,7 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) if (tss != 0) { if (pthread_attr_setstacksize(&attrs, tss) != 0) { pthread_attr_destroy(&attrs); - return PYTHREAD_INVALID_THREAD_ID; + return -1; } } #endif @@ -272,7 +272,7 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) pythread_callback *callback = PyMem_RawMalloc(sizeof(pythread_callback)); if (callback == NULL) { - return PYTHREAD_INVALID_THREAD_ID; + return -1; } callback->func = func; @@ -292,11 +292,36 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) if (status != 0) { PyMem_RawFree(callback); + return -1; + } + *out_id = th; + return 0; +} + +unsigned long +PyThread_start_joinable_thread(void (*func)(void *), void *arg, Py_uintptr_t* handle) { + pthread_t th = (pthread_t) 0; + if (do_start_joinable_thread(func, arg, &th)) { return PYTHREAD_INVALID_THREAD_ID; } + assert(th == (pthread_t) (unsigned long) th); + assert(th == (pthread_t) (Py_uintptr_t) th); + *handle = (Py_uintptr_t) th; +#if SIZEOF_PTHREAD_T <= SIZEOF_LONG + return (unsigned long) th; +#else + return (unsigned long) *(unsigned long *) &th; +#endif +} +unsigned long +PyThread_start_new_thread(void (*func)(void *), void *arg) +{ + pthread_t th = (pthread_t) 0; + if (do_start_joinable_thread(func, arg, &th)) { + return PYTHREAD_INVALID_THREAD_ID; + } pthread_detach(th); - #if SIZEOF_PTHREAD_T <= SIZEOF_LONG return (unsigned long) th; #else @@ -304,6 +329,16 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) #endif } +int +PyThread_join_thread(Py_uintptr_t th) { + return pthread_join((pthread_t) th, NULL); +} + +int +PyThread_detach_thread(Py_uintptr_t th) { + return pthread_detach((pthread_t) th); +} + /* XXX This implementation is considered (to quote Tim Peters) "inherently hosed" because: - It does not guarantee the promise that a non-zero integer is returned. From 27ae38b8bfd5898dd07e23ec264895cbb2c7e26d Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 14 Oct 2023 09:26:51 +0200 Subject: [PATCH 02/19] Fix race condition in test_join_from_self --- Lib/test/test_thread.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py index 164880d65371a5..def53f7231bf99 100644 --- a/Lib/test/test_thread.py +++ b/Lib/test/test_thread.py @@ -205,26 +205,30 @@ def task(): def test_join_from_self(self): errors = [] - lock = thread.allocate_lock() - lock.acquire() + start_new_thread_returned = thread.allocate_lock() + start_new_thread_returned.acquire() + task_tried_to_join = thread.allocate_lock() + task_tried_to_join.acquire() def task(): ident = thread.get_ident() # Wait for start_new_thread() to return so that the joinable threads # are populated with the ident, otherwise ValueError would be raised # instead. - lock.acquire() + start_new_thread_returned.acquire() try: thread.join_thread(ident) except Exception as e: errors.append(e) + finally: + task_tried_to_join.release() with threading_helper.wait_threads_exit(): joinable = True ident = thread.start_new_thread(task, (), {}, joinable) - lock.release() - time.sleep(0.05) + start_new_thread_returned.release() # Can still join after join_thread() failed in other thread + task_tried_to_join.acquire() thread.join_thread(ident) assert len(errors) == 1 From 31bf2f328a968edfbdf11d55af9536e4ad3ce6a7 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 14 Oct 2023 09:45:41 +0200 Subject: [PATCH 03/19] Improve internal docs --- Include/pythread.h | 11 +++++++++++ Modules/_threadmodule.c | 18 ++++++++++++------ 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/Include/pythread.h b/Include/pythread.h index 50da70681fbb33..d319ee14f65867 100644 --- a/Include/pythread.h +++ b/Include/pythread.h @@ -21,6 +21,17 @@ PyAPI_FUNC(void) _Py_NO_RETURN PyThread_exit_thread(void); PyAPI_FUNC(unsigned long) PyThread_get_thread_ident(void); #if !defined(Py_LIMITED_API) +/* Thread joining APIs. + * + * These APIs have a strict contract: + * - Either PyThread_join_thread or PyThread_detach_thread must be called + * exactly once with the given handle. + * - Calling neither PyThread_join_thread nor PyThread_detach_thread results + * in a resource leak until the end of the process. + * - Any other usage, such as calling both PyThread_join_thread and + * PyThread_detach_thread, or calling them more than once (including + * simultaneously), results in undefined behavior. + */ PyAPI_FUNC(unsigned long) PyThread_start_joinable_thread(void (*func)(void *), void *arg, Py_uintptr_t* handle); diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index 4e6be73204f5d7..d46972c153f396 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -1232,12 +1232,18 @@ PyDoc_STRVAR(start_new_doc, "start_new_thread(function, args[, kwargs[, joinable]])\n\ (start_new() is an obsolete synonym)\n\ \n\ -Start a new thread and return its identifier. The thread will call the\n\ -function with positional arguments from the tuple args and keyword arguments\n\ -taken from the optional dictionary kwargs. The thread exits when the\n\ -function returns; the return value is ignored. The thread will also exit\n\ -when the function raises an unhandled exception; a stack trace will be\n\ -printed unless the exception is SystemExit.\n"); +Start a new thread and return its identifier.\n\ +\n\ +The thread will call the function with positional arguments from the\n\ +tuple args and keyword arguments taken from the optional dictionary\n\ +kwargs. The thread exits when the function returns; the return value\n\ +is ignored. The thread will also exit when the function raises an\n\ +unhandled exception; a stack trace will be printed unless the exception\n\ +is SystemExit.\n\ +If the optional joinable argument is True, then the thread must later\n\ +be joined with join_thread() or detached with detach_thread().\n\ +Failure to do so results in a system resource leak until interpreter\n\ +shutdown.\n"); static PyObject * thread_PyThread_exit_thread(PyObject *self, PyObject *Py_UNUSED(ignored)) From 192a741a0d412aaf0bb85e658de7f0525c434ce0 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 14 Oct 2023 10:38:53 +0200 Subject: [PATCH 04/19] Move API declarations to Include/cpython --- Include/cpython/pythread.h | 17 +++++++++++++++++ Include/pythread.h | 19 ------------------- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/Include/cpython/pythread.h b/Include/cpython/pythread.h index 03f710a9f7ef2e..e5967bedbbb4a8 100644 --- a/Include/cpython/pythread.h +++ b/Include/cpython/pythread.h @@ -2,6 +2,23 @@ # error "this header file must not be included directly" #endif +/* Thread joining APIs. + * + * These APIs have a strict contract: + * - Either PyThread_join_thread or PyThread_detach_thread must be called + * exactly once with the given handle. + * - Calling neither PyThread_join_thread nor PyThread_detach_thread results + * in a resource leak until the end of the process. + * - Any other usage, such as calling both PyThread_join_thread and + * PyThread_detach_thread, or calling them more than once (including + * simultaneously), results in undefined behavior. + */ +PyAPI_FUNC(unsigned long) PyThread_start_joinable_thread(void (*func)(void *), + void *arg, + Py_uintptr_t* handle); +PyAPI_FUNC(int) PyThread_join_thread(Py_uintptr_t); +PyAPI_FUNC(int) PyThread_detach_thread(Py_uintptr_t); + // PY_TIMEOUT_MAX is the highest usable value (in microseconds) of PY_TIMEOUT_T // type, and depends on the system threading API. // diff --git a/Include/pythread.h b/Include/pythread.h index d319ee14f65867..0784f6b2e5391f 100644 --- a/Include/pythread.h +++ b/Include/pythread.h @@ -20,25 +20,6 @@ PyAPI_FUNC(unsigned long) PyThread_start_new_thread(void (*)(void *), void *); PyAPI_FUNC(void) _Py_NO_RETURN PyThread_exit_thread(void); PyAPI_FUNC(unsigned long) PyThread_get_thread_ident(void); -#if !defined(Py_LIMITED_API) -/* Thread joining APIs. - * - * These APIs have a strict contract: - * - Either PyThread_join_thread or PyThread_detach_thread must be called - * exactly once with the given handle. - * - Calling neither PyThread_join_thread nor PyThread_detach_thread results - * in a resource leak until the end of the process. - * - Any other usage, such as calling both PyThread_join_thread and - * PyThread_detach_thread, or calling them more than once (including - * simultaneously), results in undefined behavior. - */ -PyAPI_FUNC(unsigned long) PyThread_start_joinable_thread(void (*func)(void *), - void *arg, - Py_uintptr_t* handle); -PyAPI_FUNC(int) PyThread_join_thread(Py_uintptr_t); -PyAPI_FUNC(int) PyThread_detach_thread(Py_uintptr_t); -#endif - #if (defined(__APPLE__) || defined(__linux__) || defined(_WIN32) \ || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) \ || defined(__DragonFly__) || defined(_AIX)) From e9c1e7aa69c9c59c44a8b19d260d3788a7241e43 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 14 Oct 2023 11:56:52 +0200 Subject: [PATCH 05/19] Add required stub for pthread_join --- Include/cpython/pthread_stubs.h | 1 + Python/thread_pthread_stubs.h | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/Include/cpython/pthread_stubs.h b/Include/cpython/pthread_stubs.h index 5246968ea05476..e542eaa5bff0cf 100644 --- a/Include/cpython/pthread_stubs.h +++ b/Include/cpython/pthread_stubs.h @@ -83,6 +83,7 @@ PyAPI_FUNC(int) pthread_create(pthread_t *restrict thread, void *(*start_routine)(void *), void *restrict arg); PyAPI_FUNC(int) pthread_detach(pthread_t thread); +PyAPI_FUNC(int) pthread_join(pthread_t thread, void** value_ptr); PyAPI_FUNC(pthread_t) pthread_self(void); PyAPI_FUNC(int) pthread_exit(void *retval) __attribute__ ((__noreturn__)); PyAPI_FUNC(int) pthread_attr_init(pthread_attr_t *attr); diff --git a/Python/thread_pthread_stubs.h b/Python/thread_pthread_stubs.h index 48bad36ec449ab..4741e594e52e65 100644 --- a/Python/thread_pthread_stubs.h +++ b/Python/thread_pthread_stubs.h @@ -94,6 +94,15 @@ pthread_detach(pthread_t thread) return 0; } +int +pthread_join(pthread_t thread, void** value_ptr) +{ + if (value_ptr) { + *value_ptr = NULL; + } + return 0; +} + PyAPI_FUNC(pthread_t) pthread_self(void) { return 0; From 70a6855c766da891a12055f18a16928fafa5ce6c Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 16 Oct 2023 22:04:38 +0200 Subject: [PATCH 06/19] Mention that the new PyThread functions are uninterruptible. --- Include/cpython/pythread.h | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/Include/cpython/pythread.h b/Include/cpython/pythread.h index e5967bedbbb4a8..17b7177a7be320 100644 --- a/Include/cpython/pythread.h +++ b/Include/cpython/pythread.h @@ -16,7 +16,18 @@ PyAPI_FUNC(unsigned long) PyThread_start_joinable_thread(void (*func)(void *), void *arg, Py_uintptr_t* handle); +/* + * Join a thread started with `PyThread_start_joinable_thread`. + * This function cannot be interrupted. It returns 0 on success, + * a non-zero value on failure. + */ PyAPI_FUNC(int) PyThread_join_thread(Py_uintptr_t); +/* + * Detach a thread started with `PyThread_start_joinable_thread`, such + * that its resources are relased as soon as it exits. + * This function cannot be interrupted. It returns 0 on success, + * a non-zero value on failure. + */ PyAPI_FUNC(int) PyThread_detach_thread(Py_uintptr_t); // PY_TIMEOUT_MAX is the highest usable value (in microseconds) of PY_TIMEOUT_T From d61fd5eedb0be39934bde6ab3d1023a88a50fc0b Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 16 Oct 2023 22:29:37 +0200 Subject: [PATCH 07/19] Add test for detaching from self --- Lib/test/test_thread.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py index def53f7231bf99..5ee1980216c0bc 100644 --- a/Lib/test/test_thread.py +++ b/Lib/test/test_thread.py @@ -235,6 +235,33 @@ def task(): with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"): raise errors[0] + def test_detach_from_self(self): + errors = [] + start_new_thread_returned = thread.allocate_lock() + start_new_thread_returned.acquire() + thread_detached = thread.allocate_lock() + thread_detached.acquire() + + def task(): + ident = thread.get_ident() + start_new_thread_returned.acquire() + try: + thread.detach_thread(ident) + except Exception as e: + errors.append(e) + finally: + thread_detached.release() + + with threading_helper.wait_threads_exit(): + joinable = True + ident = thread.start_new_thread(task, (), {}, joinable) + start_new_thread_returned.release() + thread_detached.acquire() + with self.assertRaisesRegex(ValueError, "not joinable"): + thread.join_thread(ident) + + assert len(errors) == 0 + def test_detach_then_join(self): lock = thread.allocate_lock() lock.acquire() From 751675ea5732f1d06481360ac14dca81a2084bab Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 16 Oct 2023 22:55:57 +0200 Subject: [PATCH 08/19] Fix race condition in Thread.join() --- Lib/test/test_threading.py | 26 ++++++++++++++++++++++++++ Lib/threading.py | 14 ++++++++------ 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 00a64372b394dc..21e8c880c049dc 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -482,6 +482,32 @@ def test_enumerate_after_join(self): finally: sys.setswitchinterval(old_interval) + def test_join_from_multiple_threads(self): + # Thread.join() should be thread-safe + errors = [] + + def worker(): + time.sleep(0.005) + + def joiner(thread): + try: + thread.join() + except Exception as e: + errors.append(e) + + for N in range(2, 20): + threads = [threading.Thread(target=worker)] + for i in range(N): + threads.append(threading.Thread(target=joiner, + args=(threads[0],))) + for t in threads: + t.start() + time.sleep(0.01) + for t in threads: + t.join() + if errors: + raise errors[0] + def test_no_refcycle_through_target(self): class RunSelfFunction(object): def __init__(self, should_raise): diff --git a/Lib/threading.py b/Lib/threading.py index 8c23e8f313555a..faa0a6d7ce263e 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -1165,13 +1165,15 @@ def join(self, timeout=None): def _join_os_thread(self): join_lock = self._join_lock - if join_lock is not None: - # Calling join() multiple times simultaneously would result in early - # return for one of the callers. - with join_lock: + if join_lock is None: + return + with join_lock: + # Calling join() multiple times simultaneously would raise + # an exception in one of the callers. + if self._join_lock is not None: _join_thread(self._ident) - self._join_lock = None - self._non_joined_finalizer = None + self._join_lock = None + self._non_joined_finalizer = None def _wait_for_tstate_lock(self, block=True, timeout=-1): # Issue #18808: wait for the thread state to be gone. From 235544e5cb24a9870ebbb0128c3a56ff10e6e7d1 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 27 Oct 2023 13:22:38 +0200 Subject: [PATCH 09/19] Make new PyThread APIs internal --- Include/cpython/pythread.h | 28 ---------------------------- Include/internal/pycore_pythread.h | 27 +++++++++++++++++++++++++++ 2 files changed, 27 insertions(+), 28 deletions(-) diff --git a/Include/cpython/pythread.h b/Include/cpython/pythread.h index 17b7177a7be320..03f710a9f7ef2e 100644 --- a/Include/cpython/pythread.h +++ b/Include/cpython/pythread.h @@ -2,34 +2,6 @@ # error "this header file must not be included directly" #endif -/* Thread joining APIs. - * - * These APIs have a strict contract: - * - Either PyThread_join_thread or PyThread_detach_thread must be called - * exactly once with the given handle. - * - Calling neither PyThread_join_thread nor PyThread_detach_thread results - * in a resource leak until the end of the process. - * - Any other usage, such as calling both PyThread_join_thread and - * PyThread_detach_thread, or calling them more than once (including - * simultaneously), results in undefined behavior. - */ -PyAPI_FUNC(unsigned long) PyThread_start_joinable_thread(void (*func)(void *), - void *arg, - Py_uintptr_t* handle); -/* - * Join a thread started with `PyThread_start_joinable_thread`. - * This function cannot be interrupted. It returns 0 on success, - * a non-zero value on failure. - */ -PyAPI_FUNC(int) PyThread_join_thread(Py_uintptr_t); -/* - * Detach a thread started with `PyThread_start_joinable_thread`, such - * that its resources are relased as soon as it exits. - * This function cannot be interrupted. It returns 0 on success, - * a non-zero value on failure. - */ -PyAPI_FUNC(int) PyThread_detach_thread(Py_uintptr_t); - // PY_TIMEOUT_MAX is the highest usable value (in microseconds) of PY_TIMEOUT_T // type, and depends on the system threading API. // diff --git a/Include/internal/pycore_pythread.h b/Include/internal/pycore_pythread.h index d31ffc78130534..80541b1c20c899 100644 --- a/Include/internal/pycore_pythread.h +++ b/Include/internal/pycore_pythread.h @@ -106,6 +106,33 @@ PyAPI_FUNC(PyLockStatus) PyThread_acquire_lock_timed_with_retries( PyThread_type_lock, PY_TIMEOUT_T microseconds); +/* Thread joining APIs. + * + * These APIs have a strict contract: + * - Either PyThread_join_thread or PyThread_detach_thread must be called + * exactly once with the given handle. + * - Calling neither PyThread_join_thread nor PyThread_detach_thread results + * in a resource leak until the end of the process. + * - Any other usage, such as calling both PyThread_join_thread and + * PyThread_detach_thread, or calling them more than once (including + * simultaneously), results in undefined behavior. + */ +PyAPI_FUNC(unsigned long) PyThread_start_joinable_thread(void (*func)(void *), + void *arg, + Py_uintptr_t* handle); +/* + * Join a thread started with `PyThread_start_joinable_thread`. + * This function cannot be interrupted. It returns 0 on success, + * a non-zero value on failure. + */ +PyAPI_FUNC(int) PyThread_join_thread(Py_uintptr_t); +/* + * Detach a thread started with `PyThread_start_joinable_thread`, such + * that its resources are relased as soon as it exits. + * This function cannot be interrupted. It returns 0 on success, + * a non-zero value on failure. + */ +PyAPI_FUNC(int) PyThread_detach_thread(Py_uintptr_t); #ifdef __cplusplus } From 1a58a24765b96d5e4370a4598e68cf1c2f924cbc Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 27 Oct 2023 13:24:47 +0200 Subject: [PATCH 10/19] Avoid closing invalid handle on Windows. --- Python/thread_nt.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Python/thread_nt.h b/Python/thread_nt.h index 8dbe78e42e3edf..fba5078ea47cda 100644 --- a/Python/thread_nt.h +++ b/Python/thread_nt.h @@ -217,7 +217,9 @@ unsigned long PyThread_start_new_thread(void (*func)(void *), void *arg) { Py_uintptr_t handle; unsigned long threadID = PyThread_start_joinable_thread(func, arg, &handle); - CloseHandle((HANDLE) handle); + if (handle) { + CloseHandle((HANDLE) handle); + } return threadID; } From 47809e85d09011eaf46d62d5f7dd6a8edf490b12 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 27 Oct 2023 13:34:43 +0200 Subject: [PATCH 11/19] Add test for Thread.join(timeout) --- Lib/test/test_threading.py | 15 +++++++++++++++ Lib/threading.py | 7 ++++--- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 21e8c880c049dc..3f4ef9b30ee532 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -508,6 +508,21 @@ def joiner(thread): if errors: raise errors[0] + def test_join_with_timeout(self): + lock = _thread.allocate_lock() + lock.acquire() + + def worker(): + lock.acquire() + + thread = threading.Thread(target=worker) + thread.start() + thread.join(timeout=0.01) + assert thread.is_alive() + lock.release() + thread.join() + assert not thread.is_alive() + def test_no_refcycle_through_target(self): class RunSelfFunction(object): def __init__(self, should_raise): diff --git a/Lib/threading.py b/Lib/threading.py index faa0a6d7ce263e..147113c32a6f97 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -1254,9 +1254,10 @@ def is_alive(self): if self._is_stopped or not self._started.is_set(): return False self._wait_for_tstate_lock(False) - if self._is_stopped: - self._join_os_thread() - return not self._is_stopped + if not self._is_stopped: + return True + self._join_os_thread() + return False @property def daemon(self): From 70d3e51bce3314be474513c7531e39902251b3ea Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 27 Oct 2023 15:15:19 +0200 Subject: [PATCH 12/19] Switch to standalone start_joinable_thread() function and rich thread handle object --- Lib/test/audit-tests.py | 3 + Lib/test/test_audit.py | 2 + .../test_process_pool.py | 6 +- Lib/test/test_thread.py | 94 ++-- Lib/test/test_threading.py | 6 +- Lib/threading.py | 24 +- Modules/_threadmodule.c | 408 ++++++++++-------- 7 files changed, 288 insertions(+), 255 deletions(-) diff --git a/Lib/test/audit-tests.py b/Lib/test/audit-tests.py index 89f407de4b0d9c..ce4a11b119c900 100644 --- a/Lib/test/audit-tests.py +++ b/Lib/test/audit-tests.py @@ -455,6 +455,9 @@ def __call__(self): i = _thread.start_new_thread(test_func(), ()) lock.acquire() + handle = _thread.start_joinable_thread(test_func()) + handle.join() + def test_threading_abort(): # Ensures that aborting PyThreadState_New raises the correct exception diff --git a/Lib/test/test_audit.py b/Lib/test/test_audit.py index 47e5832d311bd1..cd0a4e2264865d 100644 --- a/Lib/test/test_audit.py +++ b/Lib/test/test_audit.py @@ -209,6 +209,8 @@ def test_threading(self): expected = [ ("_thread.start_new_thread", "(, (), None)"), ("test.test_func", "()"), + ("_thread.start_joinable_thread", "(,)"), + ("test.test_func", "()"), ] self.assertEqual(actual, expected) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index c73c2da1a01088..3e61b0c9387c6f 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -194,11 +194,11 @@ def test_python_finalization_error(self): context = self.get_context() - # gh-109047: Mock the threading.start_new_thread() function to inject + # gh-109047: Mock the threading.start_joinable_thread() function to inject # RuntimeError: simulate the error raised during Python finalization. # Block the second creation: create _ExecutorManagerThread, but block # QueueFeederThread. - orig_start_new_thread = threading._start_new_thread + orig_start_new_thread = threading._start_joinable_thread nthread = 0 def mock_start_new_thread(func, *args): nonlocal nthread @@ -208,7 +208,7 @@ def mock_start_new_thread(func, *args): nthread += 1 return orig_start_new_thread(func, *args) - with support.swap_attr(threading, '_start_new_thread', + with support.swap_attr(threading, '_start_joinable_thread', mock_start_new_thread): executor = self.executor_type(max_workers=2, mp_context=context) with executor: diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py index 5ee1980216c0bc..931cb4b797e0b2 100644 --- a/Lib/test/test_thread.py +++ b/Lib/test/test_thread.py @@ -165,71 +165,69 @@ def test_join_thread(self): def task(): time.sleep(0.05) - finished.append(None) + finished.append(thread.get_ident()) with threading_helper.wait_threads_exit(): - joinable = True - ident = thread.start_new_thread(task, (), {}, joinable) - thread.join_thread(ident) + handle = thread.start_joinable_thread(task) + handle.join() self.assertEqual(len(finished), 1) + self.assertEqual(handle.ident, finished[0]) def test_join_thread_already_exited(self): def task(): pass with threading_helper.wait_threads_exit(): - joinable = True - ident = thread.start_new_thread(task, (), {}, joinable) + handle = thread.start_joinable_thread(task) time.sleep(0.05) - thread.join_thread(ident) + handle.join() - def test_join_non_joinable(self): + def test_join_several_times(self): def task(): pass with threading_helper.wait_threads_exit(): - ident = thread.start_new_thread(task, ()) + handle = thread.start_joinable_thread(task) + handle.join() with self.assertRaisesRegex(ValueError, "not joinable"): - thread.join_thread(ident) + handle.join() + + def test_joinable_not_joined(self): + handle_destroyed = thread.allocate_lock() + handle_destroyed.acquire() - def test_join_several_times(self): def task(): - pass + handle_destroyed.acquire() with threading_helper.wait_threads_exit(): - joinable = True - ident = thread.start_new_thread(task, (), {}, joinable) - thread.join_thread(ident) - with self.assertRaisesRegex(ValueError, "not joinable"): - thread.join_thread(ident) + handle = thread.start_joinable_thread(task) + del handle + handle_destroyed.release() def test_join_from_self(self): errors = [] - start_new_thread_returned = thread.allocate_lock() - start_new_thread_returned.acquire() + handles = [] + start_joinable_thread_returned = thread.allocate_lock() + start_joinable_thread_returned.acquire() task_tried_to_join = thread.allocate_lock() task_tried_to_join.acquire() def task(): - ident = thread.get_ident() - # Wait for start_new_thread() to return so that the joinable threads - # are populated with the ident, otherwise ValueError would be raised - # instead. - start_new_thread_returned.acquire() + start_joinable_thread_returned.acquire() try: - thread.join_thread(ident) + handles[0].join() except Exception as e: errors.append(e) finally: task_tried_to_join.release() with threading_helper.wait_threads_exit(): - joinable = True - ident = thread.start_new_thread(task, (), {}, joinable) - start_new_thread_returned.release() - # Can still join after join_thread() failed in other thread + handle = thread.start_joinable_thread(task) + handles.append(handle) + start_joinable_thread_returned.release() + # Can still join after joining failed in other thread task_tried_to_join.acquire() - thread.join_thread(ident) + handle.join() assert len(errors) == 1 with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"): @@ -237,28 +235,28 @@ def task(): def test_detach_from_self(self): errors = [] - start_new_thread_returned = thread.allocate_lock() - start_new_thread_returned.acquire() + handles = [] + start_joinable_thread_returned = thread.allocate_lock() + start_joinable_thread_returned.acquire() thread_detached = thread.allocate_lock() thread_detached.acquire() def task(): - ident = thread.get_ident() - start_new_thread_returned.acquire() + start_joinable_thread_returned.acquire() try: - thread.detach_thread(ident) + handles[0].detach() except Exception as e: errors.append(e) finally: thread_detached.release() with threading_helper.wait_threads_exit(): - joinable = True - ident = thread.start_new_thread(task, (), {}, joinable) - start_new_thread_returned.release() + handle = thread.start_joinable_thread(task) + handles.append(handle) + start_joinable_thread_returned.release() thread_detached.acquire() with self.assertRaisesRegex(ValueError, "not joinable"): - thread.join_thread(ident) + handle.join() assert len(errors) == 0 @@ -270,13 +268,12 @@ def task(): lock.acquire() with threading_helper.wait_threads_exit(): - joinable = True - ident = thread.start_new_thread(task, (), {}, joinable) - # detach_thread() returns even though the thread is blocked on lock - thread.detach_thread(ident) - # join_thread() then cannot be called anymore + handle = thread.start_joinable_thread(task) + # detach() returns even though the thread is blocked on lock + handle.detach() + # join() then cannot be called anymore with self.assertRaisesRegex(ValueError, "not joinable"): - thread.join_thread(ident) + handle.join() lock.release() def test_join_then_detach(self): @@ -284,11 +281,10 @@ def task(): pass with threading_helper.wait_threads_exit(): - joinable = True - ident = thread.start_new_thread(task, (), {}, joinable) - thread.join_thread(ident) + handle = thread.start_joinable_thread(task) + handle.join() with self.assertRaisesRegex(ValueError, "not joinable"): - thread.detach_thread(ident) + handle.detach() class Barrier: diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 3f4ef9b30ee532..146e2dbc0fc396 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -376,8 +376,8 @@ def test_limbo_cleanup(self): # Issue 7481: Failure to start thread should cleanup the limbo map. def fail_new_thread(*args): raise threading.ThreadError() - _start_new_thread = threading._start_new_thread - threading._start_new_thread = fail_new_thread + _start_joinable_thread = threading._start_joinable_thread + threading._start_joinable_thread = fail_new_thread try: t = threading.Thread(target=lambda: None) self.assertRaises(threading.ThreadError, t.start) @@ -385,7 +385,7 @@ def fail_new_thread(*args): t in threading._limbo, "Failed to cleanup _limbo map on failure of Thread.start().") finally: - threading._start_new_thread = _start_new_thread + threading._start_joinable_thread = _start_joinable_thread def test_finalize_running_thread(self): # Issue 1402: the PyGILState_Ensure / _Release functions may be called diff --git a/Lib/threading.py b/Lib/threading.py index 147113c32a6f97..6711f7f9cf1e97 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -34,9 +34,7 @@ 'setprofile_all_threads','settrace_all_threads'] # Rename some stuff so "from threading import *" is safe -_start_new_thread = _thread.start_new_thread -_join_thread = _thread.join_thread -_detach_thread = _thread.detach_thread +_start_joinable_thread = _thread.start_joinable_thread _daemon_threads_allowed = _thread.daemon_threads_allowed _allocate_lock = _thread.allocate_lock _set_sentinel = _thread._set_sentinel @@ -928,6 +926,7 @@ class is implemented. self._native_id = None self._tstate_lock = None self._join_lock = None + self._handle = None self._started = Event() self._is_stopped = False self._initialized = True @@ -993,19 +992,13 @@ def start(self): _limbo[self] = self try: # Start joinable thread - _start_new_thread(self._bootstrap, (), {}, True) + self._handle = _start_joinable_thread(self._bootstrap) except Exception: with _active_limbo_lock: del _limbo[self] raise self._started.wait() # Will set ident and native_id - # We need to make sure the OS thread is either explicitly joined or - # detached at some point, otherwise system resources can be leaked. - def _finalizer(wr, _detach_thread=_detach_thread, ident=self._ident): - _detach_thread(ident) - self._non_joined_finalizer = _weakref.ref(self, _finalizer) - def run(self): """Method representing the thread's activity. @@ -1168,12 +1161,13 @@ def _join_os_thread(self): if join_lock is None: return with join_lock: - # Calling join() multiple times simultaneously would raise - # an exception in one of the callers. - if self._join_lock is not None: - _join_thread(self._ident) + # Calling join() multiple times would raise an exception + # in one of the callers. + if self._handle is not None: + self._handle.join() + self._handle = None + # No need to keep this around self._join_lock = None - self._non_joined_finalizer = None def _wait_for_tstate_lock(self, block=True, timeout=-1): # Issue #18808: wait for the thread state to be gone. diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index d46972c153f396..5b8e5ecb76b50c 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -22,13 +22,13 @@ // Forward declarations static struct PyModuleDef thread_module; - +// Module state typedef struct { PyTypeObject *excepthook_type; PyTypeObject *lock_type; PyTypeObject *local_type; PyTypeObject *local_dummy_type; - PyObject *joinable_dict; + PyTypeObject *thread_handle_type; } thread_module_state; static inline thread_module_state* @@ -39,26 +39,128 @@ get_thread_state(PyObject *module) return (thread_module_state *)state; } -static int get_joinable_thread(thread_module_state* state, PyObject* ident_obj, - unsigned long *ident, Py_uintptr_t *handle) { - PyObject* handle_obj = PyDict_GetItemWithError(state->joinable_dict, ident_obj); - if (handle_obj == NULL) { - if (!PyErr_Occurred()) { - PyErr_SetString(PyExc_ValueError, "the given thread is not joinable"); +// _ThreadHandle type + +typedef struct { + PyObject_HEAD + unsigned long ident; // TODO ULL instead + Py_uintptr_t handle; + char joinable; +} ThreadHandleObject; + +static ThreadHandleObject* +new_thread_handle(thread_module_state* state) +{ + ThreadHandleObject* self = PyObject_New(ThreadHandleObject, state->thread_handle_type); + if (self == NULL) { + return NULL; + } + self->ident = 0; + self->handle = 0; + self->joinable = 0; + return self; +} + +static void +ThreadHandle_dealloc(ThreadHandleObject *self) +{ + PyObject *tp = (PyObject *) Py_TYPE(self); + if (self->joinable) { + int ret = PyThread_detach_thread(self->handle); + if (ret) { + PyErr_SetString(ThreadError, "Failed detaching thread"); + PyErr_WriteUnraisable(tp); } - return -1; } - *ident = PyLong_AsUnsignedLong(ident_obj); - unsigned long long ull_handle = PyLong_AsUnsignedLongLong(handle_obj); - if ((*ident == (unsigned long) -1 || ull_handle == (unsigned long long) -1) - && PyErr_Occurred()) { - // This should not occur as we control the contents of state->joinable_dict - return -1; + PyObject_Free(self); + Py_DECREF(tp); +} + +static PyObject * +ThreadHandle_repr(ThreadHandleObject *self) +{ + return PyUnicode_FromFormat("<%s object: ident=%ul>", + Py_TYPE(self)->tp_name, self->ident); +} + +static PyObject * +ThreadHandle_get_ident(ThreadHandleObject *self, void *ignored) +{ + return PyLong_FromUnsignedLong(self->ident); +} + +static PyObject * +ThreadHandle_detach(ThreadHandleObject *self, void* ignored) +{ + if (!self->joinable) { + PyErr_SetString(PyExc_ValueError, + "the thread is not joinable and thus cannot be detached"); + return NULL; } - *handle = (Py_uintptr_t) ull_handle; - return 0; + self->joinable = 0; + // This is typically short so no need to release the GIL + int ret = PyThread_detach_thread(self->handle); + if (ret) { + PyErr_SetString(ThreadError, "Failed detaching thread"); + return NULL; + } + Py_RETURN_NONE; +} + +static PyObject * +ThreadHandle_join(ThreadHandleObject *self, void* ignored) +{ + if (!self->joinable) { + PyErr_SetString(PyExc_ValueError, "the thread is not joinable"); + return NULL; + } + if (self->ident == PyThread_get_thread_ident()) { + // PyThread_join_thread() would deadlock or error out. + PyErr_SetString(ThreadError, "Cannot join current thread"); + return NULL; + } + // Before actually joining, we must first mark the thread as non-joinable, + // as joining several times simultaneously or sequentially is undefined behavior. + self->joinable = 0; + int ret; + Py_BEGIN_ALLOW_THREADS + ret = PyThread_join_thread(self->handle); + Py_END_ALLOW_THREADS + if (ret) { + PyErr_SetString(ThreadError, "Failed joining thread"); + return NULL; + } + Py_RETURN_NONE; } +static PyGetSetDef ThreadHandle_getsetlist[] = { + {"ident", (getter)ThreadHandle_get_ident, NULL, NULL}, + {0}, +}; + +static PyMethodDef ThreadHandle_methods[] = +{ + {"detach", (PyCFunction)ThreadHandle_detach, METH_NOARGS}, + {"join", (PyCFunction)ThreadHandle_join, METH_NOARGS}, + {0, 0} +}; + +static PyType_Slot ThreadHandle_Type_slots[] = { + {Py_tp_dealloc, (destructor)ThreadHandle_dealloc}, + {Py_tp_repr, (reprfunc)ThreadHandle_repr}, + {Py_tp_getset, ThreadHandle_getsetlist}, + {Py_tp_methods, ThreadHandle_methods}, + {0, 0} +}; + +static PyType_Spec ThreadHandle_Type_spec = { + "_thread._ThreadHandle", + sizeof(ThreadHandleObject), + 0, + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_DISALLOW_INSTANTIATION, + ThreadHandle_Type_slots, +}; + /* Lock objects */ typedef struct { @@ -1129,51 +1231,22 @@ PyDoc_STRVAR(daemon_threads_allowed_doc, Return True if daemon threads are allowed in the current interpreter,\n\ and False otherwise.\n"); -static PyObject * -thread_PyThread_start_new_thread(PyObject *module, PyObject *fargs) +static int +do_start_new_thread(thread_module_state* state, + PyObject *func, PyObject* args, PyObject* kwargs, + int joinable, + unsigned long* ident, Py_uintptr_t* handle) { - PyObject *func, *args, *kwargs = NULL, *joinable = NULL; - thread_module_state *state = get_thread_state(module); - - if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 4, - &func, &args, &kwargs, &joinable)) - return NULL; - if (!PyCallable_Check(func)) { - PyErr_SetString(PyExc_TypeError, - "first arg must be callable"); - return NULL; - } - if (!PyTuple_Check(args)) { - PyErr_SetString(PyExc_TypeError, - "2nd arg must be a tuple"); - return NULL; - } - if (kwargs != NULL && !PyDict_Check(kwargs)) { - PyErr_SetString(PyExc_TypeError, - "optional 3rd arg must be a dictionary"); - return NULL; - } - if (joinable != NULL && !PyBool_Check(joinable)) { - PyErr_SetString(PyExc_TypeError, - "optional 4th arg must be a boolean"); - return NULL; - } - - if (PySys_Audit("_thread.start_new_thread", "OOO", - func, args, kwargs ? kwargs : Py_None) < 0) { - return NULL; - } - PyInterpreterState *interp = _PyInterpreterState_GET(); if (!_PyInterpreterState_HasFeature(interp, Py_RTFLAGS_THREADS)) { PyErr_SetString(PyExc_RuntimeError, "thread is not supported for isolated subinterpreters"); - return NULL; + return -1; } if (interp->finalizing) { PyErr_SetString(PyExc_RuntimeError, "can't create new thread at interpreter shutdown"); - return NULL; + return -1; } // gh-109795: Use PyMem_RawMalloc() instead of PyMem_Malloc(), @@ -1181,55 +1254,77 @@ thread_PyThread_start_new_thread(PyObject *module, PyObject *fargs) // without holding the GIL. struct bootstate *boot = PyMem_RawMalloc(sizeof(struct bootstate)); if (boot == NULL) { - return PyErr_NoMemory(); + PyErr_NoMemory(); + return -1; } boot->tstate = _PyThreadState_New(interp, _PyThreadState_WHENCE_THREADING); if (boot->tstate == NULL) { PyMem_RawFree(boot); if (!PyErr_Occurred()) { - return PyErr_NoMemory(); + PyErr_NoMemory(); } - return NULL; + return -1; } boot->func = Py_NewRef(func); boot->args = Py_NewRef(args); boot->kwargs = Py_XNewRef(kwargs); - unsigned long ident; - Py_uintptr_t handle = 0; - if (joinable == Py_True) { - ident = PyThread_start_joinable_thread(thread_run, (void*) boot, &handle); + if (joinable) { + *ident = PyThread_start_joinable_thread(thread_run, (void*) boot, handle); } else { - ident = PyThread_start_new_thread(thread_run, (void*) boot); + *handle = 0; + *ident = PyThread_start_new_thread(thread_run, (void*) boot); } - if (ident == PYTHREAD_INVALID_THREAD_ID) { + if (*ident == PYTHREAD_INVALID_THREAD_ID) { PyErr_SetString(ThreadError, "can't start new thread"); PyThreadState_Clear(boot->tstate); thread_bootstate_free(boot, 1); + return -1; + } + return 0; +} + +static PyObject * +thread_PyThread_start_new_thread(PyObject *module, PyObject *fargs) +{ + PyObject *func, *args, *kwargs = NULL; + thread_module_state *state = get_thread_state(module); + + if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3, + &func, &args, &kwargs)) + return NULL; + if (!PyCallable_Check(func)) { + PyErr_SetString(PyExc_TypeError, + "first arg must be callable"); return NULL; } - PyObject* ident_obj = PyLong_FromUnsignedLong(ident); - if (ident_obj == NULL) { + if (!PyTuple_Check(args)) { + PyErr_SetString(PyExc_TypeError, + "2nd arg must be a tuple"); return NULL; } - if (joinable == Py_True) { - PyObject* handle_obj = PyLong_FromUnsignedLongLong(handle); - if (handle_obj == NULL) { - Py_DECREF(ident_obj); - return NULL; - } - if (PyDict_SetItem(state->joinable_dict, ident_obj, handle_obj)) { - Py_DECREF(handle_obj); - Py_DECREF(ident_obj); - return NULL; - } - Py_DECREF(handle_obj); + if (kwargs != NULL && !PyDict_Check(kwargs)) { + PyErr_SetString(PyExc_TypeError, + "optional 3rd arg must be a dictionary"); + return NULL; + } + + if (PySys_Audit("_thread.start_new_thread", "OOO", + func, args, kwargs ? kwargs : Py_None) < 0) { + return NULL; } - return ident_obj; + + unsigned long ident = 0; + Py_uintptr_t handle; + if (do_start_new_thread(state, func, args, kwargs, /*joinable=*/ 0, + &ident, &handle)) { + return NULL; + } + return PyLong_FromUnsignedLong(ident); } PyDoc_STRVAR(start_new_doc, -"start_new_thread(function, args[, kwargs[, joinable]])\n\ +"start_new_thread(function, args[, kwargs])\n\ (start_new() is an obsolete synonym)\n\ \n\ Start a new thread and return its identifier.\n\ @@ -1239,108 +1334,67 @@ tuple args and keyword arguments taken from the optional dictionary\n\ kwargs. The thread exits when the function returns; the return value\n\ is ignored. The thread will also exit when the function raises an\n\ unhandled exception; a stack trace will be printed unless the exception\n\ -is SystemExit.\n\ -If the optional joinable argument is True, then the thread must later\n\ -be joined with join_thread() or detached with detach_thread().\n\ -Failure to do so results in a system resource leak until interpreter\n\ -shutdown.\n"); - -static PyObject * -thread_PyThread_exit_thread(PyObject *self, PyObject *Py_UNUSED(ignored)) -{ - PyErr_SetNone(PyExc_SystemExit); - return NULL; -} - -PyDoc_STRVAR(exit_doc, -"exit()\n\ -(exit_thread() is an obsolete synonym)\n\ -\n\ -This is synonymous to ``raise SystemExit''. It will cause the current\n\ -thread to exit silently unless the exception is caught."); +is SystemExit.\n"); static PyObject * -thread_PyThread_join_thread(PyObject *module, PyObject *ident_obj) +thread_PyThread_start_joinable_thread(PyObject *module, PyObject *func) { thread_module_state *state = get_thread_state(module); - if (!PyLong_Check(ident_obj)) { - PyErr_Format(PyExc_TypeError, "thread ident must be an int, not %s", - Py_TYPE(ident_obj)->tp_name); + if (!PyCallable_Check(func)) { + PyErr_SetString(PyExc_TypeError, + "thread function must be callable"); return NULL; } - // Check if the ident is part of the joinable threads and fetch its handle. - unsigned long ident; - Py_uintptr_t handle; - if (get_joinable_thread(state, ident_obj, &ident, &handle)) { + + if (PySys_Audit("_thread.start_joinable_thread", "O", func) < 0) { return NULL; } - if (ident == PyThread_get_thread_ident()) { - // PyThread_join_thread() would deadlock or error out. - PyErr_SetString(ThreadError, "Cannot join current thread"); + + PyObject* args = PyTuple_New(0); + if (args == NULL) { return NULL; } - // Before actually joining, we must first remove the ident from joinable_dict, - // as joining several times simultaneously or sequentially is undefined behavior. - if (PyDict_DelItem(state->joinable_dict, ident_obj)) { + ThreadHandleObject* hobj = new_thread_handle(state); + if (hobj == NULL) { + Py_DECREF(args); return NULL; } - int ret; - Py_BEGIN_ALLOW_THREADS - ret = PyThread_join_thread((Py_uintptr_t) handle); - Py_END_ALLOW_THREADS - if (ret) { - PyErr_SetString(ThreadError, "Failed joining thread"); + if (do_start_new_thread(state, func, args, /*kwargs=*/ NULL, /*joinable=*/ 1, + &hobj->ident, &hobj->handle)) { + Py_DECREF(args); + Py_DECREF(hobj); return NULL; } - Py_RETURN_NONE; + Py_DECREF(args); + hobj->joinable = 1; + return (PyObject*) hobj; } -PyDoc_STRVAR(join_thread_doc, -"join_thread(ident)\n\ +PyDoc_STRVAR(start_joinable_doc, +"start_joinable_thread(function)\n\ +\n\ +*For internal use only*: start a new thread.\n\ \n\ -Join the thread with the given identifier. The thread must have been started\n\ -with start_new_thread() with the `joinable` argument set to True.\n\ -This function can only be called once per joinable thread, and it cannot be\n\ -called if detach_thread() was previously called on the same thread.\n"); +Like start_new_thread(), this starts a new thread calling the given function.\n\ +Unlike start_new_thread(), this returns a handle object with methods to join\n\ +or detach the given thread.\n\ +This function is not for third-party code, please use the\n\ +`threading` module instead.\n"); static PyObject * -thread_PyThread_detach_thread(PyObject *module, PyObject *ident_obj) +thread_PyThread_exit_thread(PyObject *self, PyObject *Py_UNUSED(ignored)) { - thread_module_state *state = get_thread_state(module); - - if (!PyLong_Check(ident_obj)) { - PyErr_Format(PyExc_TypeError, "thread ident must be an int, not %s", - Py_TYPE(ident_obj)->tp_name); - return NULL; - } - unsigned long ident; - Py_uintptr_t handle; - if (get_joinable_thread(state, ident_obj, &ident, &handle)) { - return NULL; - } - if (PyDict_DelItem(state->joinable_dict, ident_obj)) { - if (PyErr_ExceptionMatches(PyExc_KeyError)) { - PyErr_SetString(PyExc_ValueError, - "the given thread is not joinable and thus cannot be detached"); - } - return NULL; - } - int ret = PyThread_detach_thread(handle); - if (ret) { - PyErr_SetString(ThreadError, "Failed detaching thread"); - return NULL; - } - Py_RETURN_NONE; + PyErr_SetNone(PyExc_SystemExit); + return NULL; } -PyDoc_STRVAR(detach_thread_doc, -"detach_thread(ident)\n\ +PyDoc_STRVAR(exit_doc, +"exit()\n\ +(exit_thread() is an obsolete synonym)\n\ \n\ -Detach the thread with the given identifier. The thread must have been started\n\ -with start_new_thread() with the `joinable` argument set to True.\n\ -This function can only be called once per joinable thread, and it cannot be\n\ -called if join_thread() was previously called on the same thread.\n"); +This is synonymous to ``raise SystemExit''. It will cause the current\n\ +thread to exit silently unless the exception is caught."); static PyObject * thread_PyThread_interrupt_main(PyObject *self, PyObject *args) @@ -1712,10 +1766,8 @@ static PyMethodDef thread_methods[] = { METH_VARARGS, start_new_doc}, {"start_new", (PyCFunction)thread_PyThread_start_new_thread, METH_VARARGS, start_new_doc}, - {"join_thread", (PyCFunction)thread_PyThread_join_thread, - METH_O, join_thread_doc}, - {"detach_thread", (PyCFunction)thread_PyThread_detach_thread, - METH_O, detach_thread_doc}, + {"start_joinable_thread", (PyCFunction)thread_PyThread_start_joinable_thread, + METH_O, start_joinable_doc}, {"daemon_threads_allowed", (PyCFunction)thread_daemon_threads_allowed, METH_NOARGS, daemon_threads_allowed_doc}, {"allocate_lock", thread_PyThread_allocate_lock, @@ -1759,6 +1811,15 @@ thread_module_exec(PyObject *module) // Initialize the C thread library PyThread_init_thread(); + // _ThreadHandle + state->thread_handle_type = (PyTypeObject *)PyType_FromSpec(&ThreadHandle_Type_spec); + if (state->thread_handle_type == NULL) { + return -1; + } + if (PyDict_SetItemString(d, "_ThreadHandle", (PyObject *)state->thread_handle_type) < 0) { + return -1; + } + // Lock state->lock_type = (PyTypeObject *)PyType_FromSpec(&lock_type_spec); if (state->lock_type == NULL) { @@ -1794,12 +1855,6 @@ thread_module_exec(PyObject *module) return -1; } - // Dict of joinable threads: ident -> handle - state->joinable_dict = PyDict_New(); - if (state->joinable_dict == NULL) { - return -1; - } - // Add module attributes if (PyDict_SetItemString(d, "error", ThreadError) < 0) { return -1; @@ -1838,7 +1893,7 @@ thread_module_traverse(PyObject *module, visitproc visit, void *arg) Py_VISIT(state->lock_type); Py_VISIT(state->local_type); Py_VISIT(state->local_dummy_type); - Py_VISIT(state->joinable_dict); + Py_VISIT(state->thread_handle_type); return 0; } @@ -1850,24 +1905,7 @@ thread_module_clear(PyObject *module) Py_CLEAR(state->lock_type); Py_CLEAR(state->local_type); Py_CLEAR(state->local_dummy_type); - // To avoid resource leaks, detach all still joinable threads - if (state->joinable_dict != NULL) { - PyObject *key, *value; - Py_ssize_t pos = 0; - - while (PyDict_Next(state->joinable_dict, &pos, &key, &value)) { - if (PyLong_Check(value)) { - unsigned long long handle = PyLong_AsUnsignedLongLong(value); - if (handle == (unsigned long long) -1 && PyErr_Occurred()) { - // Should not happen - PyErr_Clear(); - } else { - PyThread_detach_thread((Py_uintptr_t) handle); - } - } - } - Py_CLEAR(state->joinable_dict); - } + Py_CLEAR(state->thread_handle_type); return 0; } From d14db26f5f75c9b74a895cb12792685d06e8edf1 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 27 Oct 2023 16:20:31 +0200 Subject: [PATCH 13/19] Use `unsigned long long` for thread ids --- Include/internal/pycore_pythread.h | 9 ++++--- Modules/_threadmodule.c | 40 ++++++++++++++++-------------- Python/thread_nt.h | 32 ++++++++++++++++-------- Python/thread_pthread.h | 32 +++++++++++++----------- 4 files changed, 66 insertions(+), 47 deletions(-) diff --git a/Include/internal/pycore_pythread.h b/Include/internal/pycore_pythread.h index 80541b1c20c899..f1020cc1e60fab 100644 --- a/Include/internal/pycore_pythread.h +++ b/Include/internal/pycore_pythread.h @@ -106,6 +106,8 @@ PyAPI_FUNC(PyLockStatus) PyThread_acquire_lock_timed_with_retries( PyThread_type_lock, PY_TIMEOUT_T microseconds); +PyAPI_FUNC(unsigned long long) PyThread_get_thread_ident_ex(void); + /* Thread joining APIs. * * These APIs have a strict contract: @@ -117,9 +119,10 @@ PyAPI_FUNC(PyLockStatus) PyThread_acquire_lock_timed_with_retries( * PyThread_detach_thread, or calling them more than once (including * simultaneously), results in undefined behavior. */ -PyAPI_FUNC(unsigned long) PyThread_start_joinable_thread(void (*func)(void *), - void *arg, - Py_uintptr_t* handle); +PyAPI_FUNC(int) PyThread_start_joinable_thread(void (*func)(void *), + void *arg, + unsigned long long* ident, + Py_uintptr_t* handle); /* * Join a thread started with `PyThread_start_joinable_thread`. * This function cannot be interrupted. It returns 0 on success, diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index 5b8e5ecb76b50c..2ce90395554ca5 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -43,7 +43,7 @@ get_thread_state(PyObject *module) typedef struct { PyObject_HEAD - unsigned long ident; // TODO ULL instead + unsigned long long ident; Py_uintptr_t handle; char joinable; } ThreadHandleObject; @@ -79,14 +79,14 @@ ThreadHandle_dealloc(ThreadHandleObject *self) static PyObject * ThreadHandle_repr(ThreadHandleObject *self) { - return PyUnicode_FromFormat("<%s object: ident=%ul>", + return PyUnicode_FromFormat("<%s object: ident=%llu>", Py_TYPE(self)->tp_name, self->ident); } static PyObject * ThreadHandle_get_ident(ThreadHandleObject *self, void *ignored) { - return PyLong_FromUnsignedLong(self->ident); + return PyLong_FromUnsignedLongLong(self->ident); } static PyObject * @@ -114,7 +114,7 @@ ThreadHandle_join(ThreadHandleObject *self, void* ignored) PyErr_SetString(PyExc_ValueError, "the thread is not joinable"); return NULL; } - if (self->ident == PyThread_get_thread_ident()) { + if (self->ident == PyThread_get_thread_ident_ex()) { // PyThread_join_thread() would deadlock or error out. PyErr_SetString(ThreadError, "Cannot join current thread"); return NULL; @@ -396,7 +396,7 @@ static PyType_Spec lock_type_spec = { typedef struct { PyObject_HEAD PyThread_type_lock rlock_lock; - unsigned long rlock_owner; + unsigned long long rlock_owner; unsigned long rlock_count; PyObject *in_weakreflist; } rlockobject; @@ -433,13 +433,13 @@ static PyObject * rlock_acquire(rlockobject *self, PyObject *args, PyObject *kwds) { _PyTime_t timeout; - unsigned long tid; + unsigned long long tid; PyLockStatus r = PY_LOCK_ACQUIRED; if (lock_acquire_parse_args(args, kwds, &timeout) < 0) return NULL; - tid = PyThread_get_thread_ident(); + tid = PyThread_get_thread_ident_ex(); if (self->rlock_count > 0 && tid == self->rlock_owner) { unsigned long count = self->rlock_count + 1; if (count <= self->rlock_count) { @@ -482,7 +482,7 @@ the lock is taken and its internal counter initialized to 1."); static PyObject * rlock_release(rlockobject *self, PyObject *Py_UNUSED(ignored)) { - unsigned long tid = PyThread_get_thread_ident(); + unsigned long long tid = PyThread_get_thread_ident_ex(); if (self->rlock_count == 0 || self->rlock_owner != tid) { PyErr_SetString(PyExc_RuntimeError, @@ -566,7 +566,7 @@ For internal use by `threading.Condition`."); static PyObject * rlock_recursion_count(rlockobject *self, PyObject *Py_UNUSED(ignored)) { - unsigned long tid = PyThread_get_thread_ident(); + unsigned long long tid = PyThread_get_thread_ident_ex(); return PyLong_FromUnsignedLong( self->rlock_owner == tid ? self->rlock_count : 0UL); } @@ -579,7 +579,7 @@ For internal use by reentrancy checks."); static PyObject * rlock_is_owned(rlockobject *self, PyObject *Py_UNUSED(ignored)) { - unsigned long tid = PyThread_get_thread_ident(); + unsigned long long tid = PyThread_get_thread_ident_ex(); if (self->rlock_count > 0 && self->rlock_owner == tid) { Py_RETURN_TRUE; @@ -1235,7 +1235,7 @@ static int do_start_new_thread(thread_module_state* state, PyObject *func, PyObject* args, PyObject* kwargs, int joinable, - unsigned long* ident, Py_uintptr_t* handle) + unsigned long long* ident, Py_uintptr_t* handle) { PyInterpreterState *interp = _PyInterpreterState_GET(); if (!_PyInterpreterState_HasFeature(interp, Py_RTFLAGS_THREADS)) { @@ -1269,13 +1269,15 @@ do_start_new_thread(thread_module_state* state, boot->args = Py_NewRef(args); boot->kwargs = Py_XNewRef(kwargs); + int err; if (joinable) { - *ident = PyThread_start_joinable_thread(thread_run, (void*) boot, handle); + err = PyThread_start_joinable_thread(thread_run, (void*) boot, ident, handle); } else { *handle = 0; *ident = PyThread_start_new_thread(thread_run, (void*) boot); + err = (*ident == PYTHREAD_INVALID_THREAD_ID); } - if (*ident == PYTHREAD_INVALID_THREAD_ID) { + if (err) { PyErr_SetString(ThreadError, "can't start new thread"); PyThreadState_Clear(boot->tstate); thread_bootstate_free(boot, 1); @@ -1314,13 +1316,13 @@ thread_PyThread_start_new_thread(PyObject *module, PyObject *fargs) return NULL; } - unsigned long ident = 0; + unsigned long long ident = 0; Py_uintptr_t handle; if (do_start_new_thread(state, func, args, kwargs, /*joinable=*/ 0, &ident, &handle)) { return NULL; } - return PyLong_FromUnsignedLong(ident); + return PyLong_FromUnsignedLongLong(ident); } PyDoc_STRVAR(start_new_doc, @@ -1440,12 +1442,12 @@ information about locks."); static PyObject * thread_get_ident(PyObject *self, PyObject *Py_UNUSED(ignored)) { - unsigned long ident = PyThread_get_thread_ident(); + unsigned long long ident = PyThread_get_thread_ident_ex(); if (ident == PYTHREAD_INVALID_THREAD_ID) { PyErr_SetString(ThreadError, "no current thread ident"); return NULL; } - return PyLong_FromUnsignedLong(ident); + return PyLong_FromUnsignedLongLong(ident); } PyDoc_STRVAR(get_ident_doc, @@ -1632,8 +1634,8 @@ thread_excepthook_file(PyObject *file, PyObject *exc_type, PyObject *exc_value, Py_DECREF(name); } else { - unsigned long ident = PyThread_get_thread_ident(); - PyObject *str = PyUnicode_FromFormat("%lu", ident); + unsigned long long ident = PyThread_get_thread_ident_ex(); + PyObject *str = PyUnicode_FromFormat("%llu", ident); if (str != NULL) { if (PyFile_WriteObject(str, file, Py_PRINT_RAW) < 0) { Py_DECREF(str); diff --git a/Python/thread_nt.h b/Python/thread_nt.h index fba5078ea47cda..ad2259cd4083db 100644 --- a/Python/thread_nt.h +++ b/Python/thread_nt.h @@ -182,8 +182,9 @@ bootstrap(void *call) return 0; } -unsigned long -PyThread_start_joinable_thread(void (*func)(void *), void *arg, Py_uintptr_t* handle) { +int +PyThread_start_joinable_thread(void (*func)(void *), void *arg, + unsigned long long* ident, Py_uintptr_t* handle) { HANDLE hThread; unsigned threadID; callobj *obj; @@ -193,7 +194,7 @@ PyThread_start_joinable_thread(void (*func)(void *), void *arg, Py_uintptr_t* ha obj = (callobj*)HeapAlloc(GetProcessHeap(), 0, sizeof(*obj)); if (!obj) - return PYTHREAD_INVALID_THREAD_ID; + return -1; obj->func = func; obj->arg = arg; PyThreadState *tstate = _PyThreadState_GET(); @@ -207,20 +208,22 @@ PyThread_start_joinable_thread(void (*func)(void *), void *arg, Py_uintptr_t* ha * too many threads". */ HeapFree(GetProcessHeap(), 0, obj); - return PYTHREAD_INVALID_THREAD_ID; + return -1; } + *ident = threadID; *handle = (Py_uintptr_t) hThread; - return threadID; + return 0; } unsigned long PyThread_start_new_thread(void (*func)(void *), void *arg) { Py_uintptr_t handle; - unsigned long threadID = PyThread_start_joinable_thread(func, arg, &handle); - if (handle) { - CloseHandle((HANDLE) handle); + unsigned long long ident; + if (PyThread_start_joinable_thread(func, arg, &ident, &handle)) { + return PYTHREAD_INVALID_THREAD_ID; } - return threadID; + CloseHandle((HANDLE) handle); + return ident; } int @@ -241,8 +244,8 @@ PyThread_detach_thread(Py_uintptr_t handle) { * Return the thread Id instead of a handle. The Id is said to uniquely identify the * thread in the system */ -unsigned long -PyThread_get_thread_ident(void) +unsigned long long +PyThread_get_thread_ident_ex(void) { if (!initialized) PyThread_init_thread(); @@ -250,6 +253,13 @@ PyThread_get_thread_ident(void) return GetCurrentThreadId(); } +unsigned long +PyThread_get_thread_ident(void) +{ + return (unsigned long) PyThread_get_thread_ident_ex(); +} + + #ifdef PY_HAVE_THREAD_NATIVE_ID /* * Return the native Thread ID (TID) of the calling thread. diff --git a/Python/thread_pthread.h b/Python/thread_pthread.h index 5d08429c36dedc..70e072bea02860 100644 --- a/Python/thread_pthread.h +++ b/Python/thread_pthread.h @@ -298,20 +298,18 @@ do_start_joinable_thread(void (*func)(void *), void *arg, pthread_t* out_id) return 0; } -unsigned long -PyThread_start_joinable_thread(void (*func)(void *), void *arg, Py_uintptr_t* handle) { +int +PyThread_start_joinable_thread(void (*func)(void *), void *arg, + unsigned long long* ident, Py_uintptr_t* handle) { pthread_t th = (pthread_t) 0; if (do_start_joinable_thread(func, arg, &th)) { - return PYTHREAD_INVALID_THREAD_ID; + return -1; } - assert(th == (pthread_t) (unsigned long) th); - assert(th == (pthread_t) (Py_uintptr_t) th); + *ident = (unsigned long long) th; *handle = (Py_uintptr_t) th; -#if SIZEOF_PTHREAD_T <= SIZEOF_LONG - return (unsigned long) th; -#else - return (unsigned long) *(unsigned long *) &th; -#endif + assert(th == (pthread_t) *ident); + assert(th == (pthread_t) *handle); + return 0; } unsigned long @@ -345,14 +343,20 @@ PyThread_detach_thread(Py_uintptr_t th) { - The cast to unsigned long is inherently unsafe. - It is not clear that the 'volatile' (for AIX?) are any longer necessary. */ -unsigned long -PyThread_get_thread_ident(void) -{ +unsigned long long +PyThread_get_thread_ident_ex(void) { volatile pthread_t threadid; if (!initialized) PyThread_init_thread(); threadid = pthread_self(); - return (unsigned long) threadid; + assert(threadid == (pthread_t) (unsigned long long) threadid); + return (unsigned long long) threadid; +} + +unsigned long +PyThread_get_thread_ident(void) +{ + return (unsigned long) PyThread_get_thread_ident_ex(); } #ifdef PY_HAVE_THREAD_NATIVE_ID From 99f3f5283c5baca55e1d75278d11ffb2dfe14870 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 30 Oct 2023 22:16:20 +0100 Subject: [PATCH 14/19] Ensure fork safety --- Include/internal/pycore_pythread.h | 6 ++++++ Lib/threading.py | 27 ++++++++++++++++----------- Modules/_threadmodule.c | 18 ++++++++++++++++++ Python/thread_nt.h | 4 ++++ Python/thread_pthread.h | 10 ++++++++++ 5 files changed, 54 insertions(+), 11 deletions(-) diff --git a/Include/internal/pycore_pythread.h b/Include/internal/pycore_pythread.h index f1020cc1e60fab..e8664a1c3533a2 100644 --- a/Include/internal/pycore_pythread.h +++ b/Include/internal/pycore_pythread.h @@ -137,6 +137,12 @@ PyAPI_FUNC(int) PyThread_join_thread(Py_uintptr_t); */ PyAPI_FUNC(int) PyThread_detach_thread(Py_uintptr_t); +/* + * Obtain the new thread ident and handle in a forked child process. + */ +PyAPI_FUNC(void) PyThread_update_thread_after_fork(unsigned long long* ident, + Py_uintptr_t* handle); + #ifdef __cplusplus } #endif diff --git a/Lib/threading.py b/Lib/threading.py index 6711f7f9cf1e97..85aff58968082d 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -590,7 +590,7 @@ def __repr__(self): return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: {status}>" def _at_fork_reinit(self): - # Private method called by Thread._reset_internal_locks() + # Private method called by Thread._after_fork() self._cond._at_fork_reinit() def is_set(self): @@ -936,11 +936,15 @@ class is implemented. # For debugging and _after_fork() _dangling.add(self) - def _reset_internal_locks(self, is_alive): - # private! Called by _after_fork() to reset our internal locks as - # they may be in an invalid state leading to a deadlock or crash. + def _after_fork(self, new_ident=None): + # Private! Called by threading._after_fork(). self._started._at_fork_reinit() - if is_alive: + if new_ident is not None: + # This thread is alive. + self._ident = new_ident + if self._handle is not None: + self._handle.after_fork_alive() + assert self._handle.ident == new_ident # bpo-42350: If the fork happens when the thread is already stopped # (ex: after threading._shutdown() has been called), _tstate_lock # is None. Do nothing in this case. @@ -950,11 +954,14 @@ def _reset_internal_locks(self, is_alive): if self._join_lock is not None: self._join_lock._at_fork_reinit() else: - # The thread isn't alive after fork: it doesn't have a tstate + # This thread isn't alive after fork: it doesn't have a tstate # anymore. self._is_stopped = True self._tstate_lock = None self._join_lock = None + if self._handle is not None: + self._handle.after_fork_dead() + self._handle = None def __repr__(self): assert self._initialized, "Thread.__init__() was not called" @@ -1707,15 +1714,13 @@ def _after_fork(): # Any lock/condition variable may be currently locked or in an # invalid state, so we reinitialize them. if thread is current: - # There is only one active thread. We reset the ident to - # its new value since it can have changed. - thread._reset_internal_locks(True) + # This is the one and only active thread. ident = get_ident() - thread._ident = ident + thread._after_fork(new_ident=ident) new_active[ident] = thread else: # All the others are already stopped. - thread._reset_internal_locks(False) + thread._after_fork() thread._stop() _limbo.clear() diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index 2ce90395554ca5..fe281e19c54815 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -89,6 +89,22 @@ ThreadHandle_get_ident(ThreadHandleObject *self, void *ignored) return PyLong_FromUnsignedLongLong(self->ident); } + +static PyObject * +ThreadHandle_after_fork_alive(ThreadHandleObject *self, void* ignored) +{ + PyThread_update_thread_after_fork(&self->ident, &self->handle); + Py_RETURN_NONE; +} + +static PyObject * +ThreadHandle_after_fork_dead(ThreadHandleObject *self, void* ignored) +{ + // Disallow calls to detach() and join() as they could crash. + self->joinable = 0; + Py_RETURN_NONE; +} + static PyObject * ThreadHandle_detach(ThreadHandleObject *self, void* ignored) { @@ -140,6 +156,8 @@ static PyGetSetDef ThreadHandle_getsetlist[] = { static PyMethodDef ThreadHandle_methods[] = { + {"after_fork_alive", (PyCFunction)ThreadHandle_after_fork_alive, METH_NOARGS}, + {"after_fork_dead", (PyCFunction)ThreadHandle_after_fork_dead, METH_NOARGS}, {"detach", (PyCFunction)ThreadHandle_detach, METH_NOARGS}, {"join", (PyCFunction)ThreadHandle_join, METH_NOARGS}, {0, 0} diff --git a/Python/thread_nt.h b/Python/thread_nt.h index ad2259cd4083db..0a072e8cb48ccc 100644 --- a/Python/thread_nt.h +++ b/Python/thread_nt.h @@ -240,6 +240,10 @@ PyThread_detach_thread(Py_uintptr_t handle) { return (CloseHandle(hThread) == 0); } +void +PyThread_update_thread_after_fork(unsigned long long* ident, Py_uintptr_t* handle) { +} + /* * Return the thread Id instead of a handle. The Id is said to uniquely identify the * thread in the system diff --git a/Python/thread_pthread.h b/Python/thread_pthread.h index 70e072bea02860..a462f324337332 100644 --- a/Python/thread_pthread.h +++ b/Python/thread_pthread.h @@ -337,6 +337,16 @@ PyThread_detach_thread(Py_uintptr_t th) { return pthread_detach((pthread_t) th); } +void +PyThread_update_thread_after_fork(unsigned long long* ident, Py_uintptr_t* handle) { + // The thread id might have been updated in the forked child + pthread_t th = pthread_self(); + *ident = (unsigned long long) th; + *handle = (Py_uintptr_t) th; + assert(th == (pthread_t) *ident); + assert(th == (pthread_t) *handle); +} + /* XXX This implementation is considered (to quote Tim Peters) "inherently hosed" because: - It does not guarantee the promise that a non-zero integer is returned. From 5a909ca788fee6e951956a4666279bd24435ecc4 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 30 Oct 2023 22:18:48 +0100 Subject: [PATCH 15/19] Safe casts on Windows --- Python/thread_nt.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Python/thread_nt.h b/Python/thread_nt.h index 0a072e8cb48ccc..1d6e97dddf8154 100644 --- a/Python/thread_nt.h +++ b/Python/thread_nt.h @@ -211,6 +211,7 @@ PyThread_start_joinable_thread(void (*func)(void *), void *arg, return -1; } *ident = threadID; + // The cast is safe since HANDLE is pointer-sized *handle = (Py_uintptr_t) hThread; return 0; } @@ -223,7 +224,8 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) { return PYTHREAD_INVALID_THREAD_ID; } CloseHandle((HANDLE) handle); - return ident; + // The cast is safe since the ident is really an unsigned int + return (unsigned long) ident; } int From 420ece663dc30213cde026d411c0880a0f51588f Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 30 Oct 2023 22:27:03 +0100 Subject: [PATCH 16/19] Introduce PyThread_ident_t, PyThread_handle_t --- Include/internal/pycore_pythread.h | 19 ++++++++++++------- Modules/_threadmodule.c | 28 ++++++++++++++-------------- Python/thread_nt.h | 16 ++++++++-------- Python/thread_pthread.h | 22 +++++++++++----------- 4 files changed, 45 insertions(+), 40 deletions(-) diff --git a/Include/internal/pycore_pythread.h b/Include/internal/pycore_pythread.h index e8664a1c3533a2..91ba80d4f3b548 100644 --- a/Include/internal/pycore_pythread.h +++ b/Include/internal/pycore_pythread.h @@ -106,7 +106,12 @@ PyAPI_FUNC(PyLockStatus) PyThread_acquire_lock_timed_with_retries( PyThread_type_lock, PY_TIMEOUT_T microseconds); -PyAPI_FUNC(unsigned long long) PyThread_get_thread_ident_ex(void); +typedef unsigned long long PyThread_ident_t; +typedef Py_uintptr_t PyThread_handle_t; + +#define PY_FORMAT_THREAD_IDENT_T "llu" + +PyAPI_FUNC(PyThread_ident_t) PyThread_get_thread_ident_ex(void); /* Thread joining APIs. * @@ -121,27 +126,27 @@ PyAPI_FUNC(unsigned long long) PyThread_get_thread_ident_ex(void); */ PyAPI_FUNC(int) PyThread_start_joinable_thread(void (*func)(void *), void *arg, - unsigned long long* ident, - Py_uintptr_t* handle); + PyThread_ident_t* ident, + PyThread_handle_t* handle); /* * Join a thread started with `PyThread_start_joinable_thread`. * This function cannot be interrupted. It returns 0 on success, * a non-zero value on failure. */ -PyAPI_FUNC(int) PyThread_join_thread(Py_uintptr_t); +PyAPI_FUNC(int) PyThread_join_thread(PyThread_handle_t); /* * Detach a thread started with `PyThread_start_joinable_thread`, such * that its resources are relased as soon as it exits. * This function cannot be interrupted. It returns 0 on success, * a non-zero value on failure. */ -PyAPI_FUNC(int) PyThread_detach_thread(Py_uintptr_t); +PyAPI_FUNC(int) PyThread_detach_thread(PyThread_handle_t); /* * Obtain the new thread ident and handle in a forked child process. */ -PyAPI_FUNC(void) PyThread_update_thread_after_fork(unsigned long long* ident, - Py_uintptr_t* handle); +PyAPI_FUNC(void) PyThread_update_thread_after_fork(PyThread_ident_t* ident, + PyThread_handle_t* handle); #ifdef __cplusplus } diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index fe281e19c54815..be7cbaa59e1a41 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -43,8 +43,8 @@ get_thread_state(PyObject *module) typedef struct { PyObject_HEAD - unsigned long long ident; - Py_uintptr_t handle; + PyThread_ident_t ident; + PyThread_handle_t handle; char joinable; } ThreadHandleObject; @@ -79,7 +79,7 @@ ThreadHandle_dealloc(ThreadHandleObject *self) static PyObject * ThreadHandle_repr(ThreadHandleObject *self) { - return PyUnicode_FromFormat("<%s object: ident=%llu>", + return PyUnicode_FromFormat("<%s object: ident=" PY_FORMAT_THREAD_IDENT_T ">", Py_TYPE(self)->tp_name, self->ident); } @@ -414,7 +414,7 @@ static PyType_Spec lock_type_spec = { typedef struct { PyObject_HEAD PyThread_type_lock rlock_lock; - unsigned long long rlock_owner; + PyThread_ident_t rlock_owner; unsigned long rlock_count; PyObject *in_weakreflist; } rlockobject; @@ -451,7 +451,7 @@ static PyObject * rlock_acquire(rlockobject *self, PyObject *args, PyObject *kwds) { _PyTime_t timeout; - unsigned long long tid; + PyThread_ident_t tid; PyLockStatus r = PY_LOCK_ACQUIRED; if (lock_acquire_parse_args(args, kwds, &timeout) < 0) @@ -500,7 +500,7 @@ the lock is taken and its internal counter initialized to 1."); static PyObject * rlock_release(rlockobject *self, PyObject *Py_UNUSED(ignored)) { - unsigned long long tid = PyThread_get_thread_ident_ex(); + PyThread_ident_t tid = PyThread_get_thread_ident_ex(); if (self->rlock_count == 0 || self->rlock_owner != tid) { PyErr_SetString(PyExc_RuntimeError, @@ -584,7 +584,7 @@ For internal use by `threading.Condition`."); static PyObject * rlock_recursion_count(rlockobject *self, PyObject *Py_UNUSED(ignored)) { - unsigned long long tid = PyThread_get_thread_ident_ex(); + PyThread_ident_t tid = PyThread_get_thread_ident_ex(); return PyLong_FromUnsignedLong( self->rlock_owner == tid ? self->rlock_count : 0UL); } @@ -597,7 +597,7 @@ For internal use by reentrancy checks."); static PyObject * rlock_is_owned(rlockobject *self, PyObject *Py_UNUSED(ignored)) { - unsigned long long tid = PyThread_get_thread_ident_ex(); + PyThread_ident_t tid = PyThread_get_thread_ident_ex(); if (self->rlock_count > 0 && self->rlock_owner == tid) { Py_RETURN_TRUE; @@ -1253,7 +1253,7 @@ static int do_start_new_thread(thread_module_state* state, PyObject *func, PyObject* args, PyObject* kwargs, int joinable, - unsigned long long* ident, Py_uintptr_t* handle) + PyThread_ident_t* ident, PyThread_handle_t* handle) { PyInterpreterState *interp = _PyInterpreterState_GET(); if (!_PyInterpreterState_HasFeature(interp, Py_RTFLAGS_THREADS)) { @@ -1334,8 +1334,8 @@ thread_PyThread_start_new_thread(PyObject *module, PyObject *fargs) return NULL; } - unsigned long long ident = 0; - Py_uintptr_t handle; + PyThread_ident_t ident = 0; + PyThread_handle_t handle; if (do_start_new_thread(state, func, args, kwargs, /*joinable=*/ 0, &ident, &handle)) { return NULL; @@ -1460,7 +1460,7 @@ information about locks."); static PyObject * thread_get_ident(PyObject *self, PyObject *Py_UNUSED(ignored)) { - unsigned long long ident = PyThread_get_thread_ident_ex(); + PyThread_ident_t ident = PyThread_get_thread_ident_ex(); if (ident == PYTHREAD_INVALID_THREAD_ID) { PyErr_SetString(ThreadError, "no current thread ident"); return NULL; @@ -1652,8 +1652,8 @@ thread_excepthook_file(PyObject *file, PyObject *exc_type, PyObject *exc_value, Py_DECREF(name); } else { - unsigned long long ident = PyThread_get_thread_ident_ex(); - PyObject *str = PyUnicode_FromFormat("%llu", ident); + PyThread_ident_t ident = PyThread_get_thread_ident_ex(); + PyObject *str = PyUnicode_FromFormat("%" PY_FORMAT_THREAD_IDENT_T, ident); if (str != NULL) { if (PyFile_WriteObject(str, file, Py_PRINT_RAW) < 0) { Py_DECREF(str); diff --git a/Python/thread_nt.h b/Python/thread_nt.h index 1d6e97dddf8154..14b9cddc24c0ec 100644 --- a/Python/thread_nt.h +++ b/Python/thread_nt.h @@ -184,7 +184,7 @@ bootstrap(void *call) int PyThread_start_joinable_thread(void (*func)(void *), void *arg, - unsigned long long* ident, Py_uintptr_t* handle) { + PyThread_ident_t* ident, PyThread_handle_t* handle) { HANDLE hThread; unsigned threadID; callobj *obj; @@ -212,14 +212,14 @@ PyThread_start_joinable_thread(void (*func)(void *), void *arg, } *ident = threadID; // The cast is safe since HANDLE is pointer-sized - *handle = (Py_uintptr_t) hThread; + *handle = (PyThread_handle_t) hThread; return 0; } unsigned long PyThread_start_new_thread(void (*func)(void *), void *arg) { - Py_uintptr_t handle; - unsigned long long ident; + PyThread_handle_t handle; + PyThread_ident_t ident; if (PyThread_start_joinable_thread(func, arg, &ident, &handle)) { return PYTHREAD_INVALID_THREAD_ID; } @@ -229,7 +229,7 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) { } int -PyThread_join_thread(Py_uintptr_t handle) { +PyThread_join_thread(PyThread_handle_t handle) { HANDLE hThread = (HANDLE) handle; int errored = (WaitForSingleObject(hThread, INFINITE) != WAIT_OBJECT_0); CloseHandle(hThread); @@ -237,20 +237,20 @@ PyThread_join_thread(Py_uintptr_t handle) { } int -PyThread_detach_thread(Py_uintptr_t handle) { +PyThread_detach_thread(PyThread_handle_t handle) { HANDLE hThread = (HANDLE) handle; return (CloseHandle(hThread) == 0); } void -PyThread_update_thread_after_fork(unsigned long long* ident, Py_uintptr_t* handle) { +PyThread_update_thread_after_fork(PyThread_ident_t* ident, PyThread_handle_t* handle) { } /* * Return the thread Id instead of a handle. The Id is said to uniquely identify the * thread in the system */ -unsigned long long +PyThread_ident_t PyThread_get_thread_ident_ex(void) { if (!initialized) diff --git a/Python/thread_pthread.h b/Python/thread_pthread.h index a462f324337332..a8df5449714a81 100644 --- a/Python/thread_pthread.h +++ b/Python/thread_pthread.h @@ -300,13 +300,13 @@ do_start_joinable_thread(void (*func)(void *), void *arg, pthread_t* out_id) int PyThread_start_joinable_thread(void (*func)(void *), void *arg, - unsigned long long* ident, Py_uintptr_t* handle) { + PyThread_ident_t* ident, PyThread_handle_t* handle) { pthread_t th = (pthread_t) 0; if (do_start_joinable_thread(func, arg, &th)) { return -1; } - *ident = (unsigned long long) th; - *handle = (Py_uintptr_t) th; + *ident = (PyThread_ident_t) th; + *handle = (PyThread_handle_t) th; assert(th == (pthread_t) *ident); assert(th == (pthread_t) *handle); return 0; @@ -328,21 +328,21 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) } int -PyThread_join_thread(Py_uintptr_t th) { +PyThread_join_thread(PyThread_handle_t th) { return pthread_join((pthread_t) th, NULL); } int -PyThread_detach_thread(Py_uintptr_t th) { +PyThread_detach_thread(PyThread_handle_t th) { return pthread_detach((pthread_t) th); } void -PyThread_update_thread_after_fork(unsigned long long* ident, Py_uintptr_t* handle) { +PyThread_update_thread_after_fork(PyThread_ident_t* ident, PyThread_handle_t* handle) { // The thread id might have been updated in the forked child pthread_t th = pthread_self(); - *ident = (unsigned long long) th; - *handle = (Py_uintptr_t) th; + *ident = (PyThread_ident_t) th; + *handle = (PyThread_handle_t) th; assert(th == (pthread_t) *ident); assert(th == (pthread_t) *handle); } @@ -353,14 +353,14 @@ PyThread_update_thread_after_fork(unsigned long long* ident, Py_uintptr_t* handl - The cast to unsigned long is inherently unsafe. - It is not clear that the 'volatile' (for AIX?) are any longer necessary. */ -unsigned long long +PyThread_ident_t PyThread_get_thread_ident_ex(void) { volatile pthread_t threadid; if (!initialized) PyThread_init_thread(); threadid = pthread_self(); - assert(threadid == (pthread_t) (unsigned long long) threadid); - return (unsigned long long) threadid; + assert(threadid == (pthread_t) (PyThread_ident_t) threadid); + return (PyThread_ident_t) threadid; } unsigned long From 96849d13cb648a8fca3d8273945737680d4b9c21 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 4 Nov 2023 12:05:13 +0100 Subject: [PATCH 17/19] Address compiler warnings --- Include/internal/pycore_pythread.h | 1 + Modules/_threadmodule.c | 14 ++++++++------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/Include/internal/pycore_pythread.h b/Include/internal/pycore_pythread.h index 91ba80d4f3b548..9c9a09f60f3441 100644 --- a/Include/internal/pycore_pythread.h +++ b/Include/internal/pycore_pythread.h @@ -110,6 +110,7 @@ typedef unsigned long long PyThread_ident_t; typedef Py_uintptr_t PyThread_handle_t; #define PY_FORMAT_THREAD_IDENT_T "llu" +#define Py_PARSE_THREAD_IDENT_T "K" PyAPI_FUNC(PyThread_ident_t) PyThread_get_thread_ident_ex(void); diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index be7cbaa59e1a41..88ca9032b5e679 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -79,7 +79,7 @@ ThreadHandle_dealloc(ThreadHandleObject *self) static PyObject * ThreadHandle_repr(ThreadHandleObject *self) { - return PyUnicode_FromFormat("<%s object: ident=" PY_FORMAT_THREAD_IDENT_T ">", + return PyUnicode_FromFormat("<%s object: ident=%" PY_FORMAT_THREAD_IDENT_T ">", Py_TYPE(self)->tp_name, self->ident); } @@ -529,11 +529,12 @@ to be available for other threads."); static PyObject * rlock_acquire_restore(rlockobject *self, PyObject *args) { - unsigned long owner; + PyThread_ident_t owner; unsigned long count; int r = 1; - if (!PyArg_ParseTuple(args, "(kk):_acquire_restore", &count, &owner)) + if (!PyArg_ParseTuple(args, "(k" Py_PARSE_THREAD_IDENT_T "):_acquire_restore", + &count, &owner)) return NULL; if (!PyThread_acquire_lock(self->rlock_lock, 0)) { @@ -559,7 +560,7 @@ For internal use by `threading.Condition`."); static PyObject * rlock_release_save(rlockobject *self, PyObject *Py_UNUSED(ignored)) { - unsigned long owner; + PyThread_ident_t owner; unsigned long count; if (self->rlock_count == 0) { @@ -573,7 +574,7 @@ rlock_release_save(rlockobject *self, PyObject *Py_UNUSED(ignored)) self->rlock_count = 0; self->rlock_owner = 0; PyThread_release_lock(self->rlock_lock); - return Py_BuildValue("kk", count, owner); + return Py_BuildValue("k" Py_PARSE_THREAD_IDENT_T, count, owner); } PyDoc_STRVAR(rlock_release_save_doc, @@ -633,7 +634,8 @@ rlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds) static PyObject * rlock_repr(rlockobject *self) { - return PyUnicode_FromFormat("<%s %s object owner=%ld count=%lu at %p>", + return PyUnicode_FromFormat( + "<%s %s object owner=%" PY_FORMAT_THREAD_IDENT_T " count=%lu at %p>", self->rlock_count ? "locked" : "unlocked", Py_TYPE(self)->tp_name, self->rlock_owner, self->rlock_count, self); From 5cf268427fcfe5e1aa1b9209ff57efb7a26758df Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 4 Nov 2023 12:19:23 +0100 Subject: [PATCH 18/19] Fix merge error --- Lib/test/_test_multiprocessing.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 679f354b1f721d..ec003d8dc4314d 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2581,10 +2581,18 @@ def test_async(self): self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) def test_async_timeout(self): - res = self.pool.apply_async(sqr, (6, 5 * TIMEOUT2)) - get = TimingWrapper(res.get) - self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) - self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) + p = self.Pool(3) + try: + event = threading.Event() if self.TYPE == 'threads' else None + res = p.apply_async(sqr, (6, TIMEOUT2 + support.SHORT_TIMEOUT, event)) + get = TimingWrapper(res.get) + self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) + self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) + finally: + if event is not None: + event.set() + p.terminate() + p.join() def test_imap(self): it = self.pool.imap(sqr, list(range(10))) From fa11244fb74659e61fc737316dd98f4a3714d9b9 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Sat, 4 Nov 2023 13:36:54 +0000 Subject: [PATCH 19/19] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20b?= =?UTF-8?q?lurb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../2023-11-04-13-36-51.gh-issue-110829.Pa0CJI.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Core and Builtins/2023-11-04-13-36-51.gh-issue-110829.Pa0CJI.rst diff --git a/Misc/NEWS.d/next/Core and Builtins/2023-11-04-13-36-51.gh-issue-110829.Pa0CJI.rst b/Misc/NEWS.d/next/Core and Builtins/2023-11-04-13-36-51.gh-issue-110829.Pa0CJI.rst new file mode 100644 index 00000000000000..f4fa61db369ece --- /dev/null +++ b/Misc/NEWS.d/next/Core and Builtins/2023-11-04-13-36-51.gh-issue-110829.Pa0CJI.rst @@ -0,0 +1 @@ +Joining a thread now ensures the underlying OS thread has exited. This is required for safer fork() in multi-threaded processes.