diff --git a/Include/cpython/pystate.h b/Include/cpython/pystate.h index f33c72d4cf4d2a..24d92f2b247284 100644 --- a/Include/cpython/pystate.h +++ b/Include/cpython/pystate.h @@ -188,32 +188,6 @@ struct _ts { struct _py_trashcan trash; - /* Called when a thread state is deleted normally, but not when it - * is destroyed after fork(). - * Pain: to prevent rare but fatal shutdown errors (issue 18808), - * Thread.join() must wait for the join'ed thread's tstate to be unlinked - * from the tstate chain. That happens at the end of a thread's life, - * in pystate.c. - * The obvious way doesn't quite work: create a lock which the tstate - * unlinking code releases, and have Thread.join() wait to acquire that - * lock. The problem is that we _are_ at the end of the thread's life: - * if the thread holds the last reference to the lock, decref'ing the - * lock will delete the lock, and that may trigger arbitrary Python code - * if there's a weakref, with a callback, to the lock. But by this time - * _PyRuntime.gilstate.tstate_current is already NULL, so only the simplest - * of C code can be allowed to run (in particular it must not be possible to - * release the GIL). - * So instead of holding the lock directly, the tstate holds a weakref to - * the lock: that's the value of on_delete_data below. Decref'ing a - * weakref is harmless. - * on_delete points to _threadmodule.c's static release_sentinel() function. - * After the tstate is unlinked, release_sentinel is called with the - * weakref-to-lock (on_delete_data) argument, and release_sentinel releases - * the indirectly held lock. - */ - void (*on_delete)(void *); - void *on_delete_data; - int coroutine_origin_tracking_depth; PyObject *async_gen_firstiter; diff --git a/Include/internal/pycore_interp.h b/Include/internal/pycore_interp.h index edc076fc04f6c3..98f620c59c6b24 100644 --- a/Include/internal/pycore_interp.h +++ b/Include/internal/pycore_interp.h @@ -57,8 +57,6 @@ struct _is { uint64_t next_unique_id; /* The linked list of threads, newest first. */ PyThreadState *head; - /* Used in Modules/_threadmodule.c. */ - long count; /* Support for runtime thread stack size tuning. A value of 0 means using the platform's default stack size or the size specified by the THREAD_STACK_SIZE macro. */ diff --git a/Lib/threading.py b/Lib/threading.py index df273870fa4273..48e1550dbb85b0 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -49,6 +49,7 @@ except AttributeError: _CRLock = None TIMEOUT_MAX = _thread.TIMEOUT_MAX +_internal_after_fork = _thread._after_fork del _thread @@ -1143,9 +1144,7 @@ def _wait_for_tstate_lock(self, block=True, timeout=-1): return try: - if lock.acquire(block, timeout): - lock.release() - self._stop() + locked = lock.acquire(block, timeout) except: if lock.locked(): # bpo-45274: lock.acquire() acquired the lock, but the function @@ -1155,6 +1154,9 @@ def _wait_for_tstate_lock(self, block=True, timeout=-1): lock.release() self._stop() raise + if locked: + lock.release() + self._stop() @property def name(self): @@ -1677,4 +1679,5 @@ def _after_fork(): if hasattr(_os, "register_at_fork"): + _os.register_at_fork(after_in_child=_internal_after_fork) _os.register_at_fork(after_in_child=_after_fork) diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index 5d753b4a0ebc5e..172f49700123cd 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -20,9 +20,215 @@ // Forward declarations static struct PyModuleDef thread_module; +struct module_state; +static struct lockobject *newlockobject(struct module_state *); -typedef struct { +/* threads owned by the modulo */ + +struct module_thread { + PyThreadState *tstate; + /* This lock is released right after the Python code finishes. */ + PyObject *running_lock; + /* Called when a thread state is deleted normally, but not when it + * is destroyed after fork(). + * Pain: to prevent rare but fatal shutdown errors (issue 18808), + * Thread.join() must wait for the join'ed thread's tstate to be unlinked + * from the tstate chain. That happens at the end of a thread's life, + * in pystate.c. + * The obvious way doesn't quite work: create a lock which the tstate + * unlinking code releases, and have Thread.join() wait to acquire that + * lock. The problem is that we _are_ at the end of the thread's life: + * if the thread holds the last reference to the lock, decref'ing the + * lock will delete the lock, and that may trigger arbitrary Python code + * if there's a weakref, with a callback, to the lock. But by this time + * _PyRuntime.gilstate.tstate_current is already NULL, so only the simplest + * of C code can be allowed to run (in particular it must not be possible to + * release the GIL). + * So instead of holding the lock directly, the tstate holds a weakref to + * the lock: that's the value of lock_weakref. Decref'ing a + * weakref is harmless. + * After the tstate is unlinked, release_sentinel is called with the + * weakref-to-lock (lock_weakref) argument, and release_sentinel releases + * the indirectly held lock. + */ + PyObject *running_lock_weakref; + struct module_thread *prev; + struct module_thread *next; +}; + +struct module_threads { + // XXX This can replace interp->threads.count. + long num_total; + long num_running; + PyThread_type_lock mutex; + struct module_thread *head; + struct module_thread *tail; +}; + +static int +module_threads_init(struct module_threads *threads) +{ + threads->num_total = 0; + threads->num_running = 0; + threads->head = NULL; + threads->tail = NULL; + threads->mutex = PyThread_allocate_lock(); + if (threads->mutex == NULL) { + PyErr_NoMemory(); + return -1; + } + return 0; +} + +static int +module_threads_reinit(struct module_threads *threads) +{ + if (_PyThread_at_fork_reinit(threads->mutex) < 0) { + PyErr_SetString(ThreadError, "failed to reinitialize lock at fork"); + return -1; + } + return 0; +} + +static void +module_threads_fini(struct module_threads *threads) +{ + PyThread_free_lock(threads->mutex); +} + +static void +module_threads_add(struct module_threads *threads, struct module_thread *mt) +{ + PyThread_acquire_lock(threads->mutex, WAIT_LOCK); + + // Add it to the end of the list. + if (threads->head == NULL) { + threads->head = mt; + } + else { + mt->prev = threads->tail; + threads->tail->next = mt; + } + threads->tail = mt; + threads->num_total++; + + PyThread_release_lock(threads->mutex); +} + +static void +module_threads_remove(struct module_threads *threads, struct module_thread *mt) +{ + PyThread_acquire_lock(threads->mutex, WAIT_LOCK); + + if (mt->prev == NULL) { + threads->head = mt->next; + } + else { + mt->prev->next = mt->next; + } + if (mt->next == NULL) { + threads->tail = mt->prev; + } + else { + mt->next->prev = mt->prev; + } + threads->num_total--; + + PyThread_release_lock(threads->mutex); +} + +static struct module_thread * +module_threads_lookup(struct module_threads *threads, PyThreadState *tstate) +{ + PyThread_acquire_lock(threads->mutex, WAIT_LOCK); + + struct module_thread *mt = threads->head; + while (mt != NULL) { + if (mt->tstate == tstate) { + break; + } + mt = mt->next; + } + + PyThread_release_lock(threads->mutex); + return mt; +} + +static struct module_thread * +add_module_thread(struct module_state *state, struct module_threads *threads, + PyThreadState *tstate) +{ + // Create the new list entry. + struct module_thread *mt = PyMem_RawMalloc(sizeof(struct module_thread)); + if (mt == NULL) { + if (!PyErr_Occurred()) { + PyErr_NoMemory(); + } + return NULL; + } + mt->tstate = tstate; + mt->prev = NULL; + mt->next = NULL; + + // Create the "running" lock. + mt->running_lock = (PyObject *) newlockobject(state); + if (mt->running_lock == NULL) { + PyErr_NoMemory(); + return NULL; + } + mt->running_lock_weakref = NULL; + + module_threads_add(threads, mt); + + return mt; +} + +static int +module_thread_starting(struct module_threads *threads, struct module_thread *mt) +{ + assert(mt->tstate == PyThreadState_Get()); + + threads->num_running++; + + return 0; +} + +static void +module_thread_finished(struct module_threads *threads, struct module_thread *mt) +{ + threads->num_running--; + + // The threading module may keep this alive past here. + Py_CLEAR(mt->running_lock); + + // XXX We should be notifying other threads here. +} + +static void release_sentinel(PyObject *wr); + +static void +remove_module_thread(struct module_threads *threads, struct module_thread *mt) +{ + // Notify other threads that this one is done. + // XXX This should happen in module_thread_finished(). + if (mt->running_lock_weakref != NULL) { + release_sentinel(mt->running_lock_weakref); + } + + // Remove it from the list. + module_threads_remove(threads, mt); + + // Deallocate everything. + PyMem_RawFree(mt); +} + + +/* module state */ + +typedef struct module_state { + struct module_threads threads; + PyTypeObject *excepthook_type; PyTypeObject *lock_type; PyTypeObject *local_type; @@ -40,7 +246,7 @@ get_thread_state(PyObject *module) /* Lock objects */ -typedef struct { +typedef struct lockobject { PyObject_HEAD PyThread_type_lock lock_lock; PyObject *in_weakreflist; @@ -597,10 +803,8 @@ static PyType_Spec rlock_type_spec = { }; static lockobject * -newlockobject(PyObject *module) +newlockobject(thread_module_state *state) { - thread_module_state *state = get_thread_state(module); - PyTypeObject *type = state->lock_type; lockobject *self = (lockobject *)type->tp_alloc(type, 0); if (self == NULL) { @@ -1048,12 +1252,12 @@ _localdummy_destroyed(PyObject *localweakref, PyObject *dummyweakref) /* Module functions */ struct bootstate { - PyInterpreterState *interp; PyObject *func; PyObject *args; PyObject *kwargs; PyThreadState *tstate; - _PyRuntimeState *runtime; + thread_module_state *module_state; + struct module_thread *module_thread; }; @@ -1071,12 +1275,14 @@ static void thread_run(void *boot_raw) { struct bootstate *boot = (struct bootstate *) boot_raw; - PyThreadState *tstate; + PyThreadState *tstate = boot->tstate; + thread_module_state *state = boot->module_state; + struct module_thread *mt = boot->module_thread; - tstate = boot->tstate; _PyThreadState_Bind(tstate); PyEval_AcquireThread(tstate); - tstate->interp->threads.count++; + + module_thread_starting(&state->threads, mt); PyObject *res = PyObject_Call(boot->func, boot->args, boot->kwargs); if (res == NULL) { @@ -1091,9 +1297,12 @@ thread_run(void *boot_raw) Py_DECREF(res); } + module_thread_finished(&state->threads, mt); + thread_bootstate_free(boot); - tstate->interp->threads.count--; PyThreadState_Clear(tstate); + // XXX This should be called last: + remove_module_thread(&state->threads, mt); _PyThreadState_DeleteCurrent(tstate); // bpo-44434: Don't call explicitly PyThread_exit_thread(). On Linux with @@ -1122,7 +1331,6 @@ and False otherwise.\n"); static PyObject * thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) { - _PyRuntimeState *runtime = &_PyRuntime; PyObject *func, *args, *kwargs = NULL; if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3, @@ -1160,8 +1368,7 @@ thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) if (boot == NULL) { return PyErr_NoMemory(); } - boot->interp = _PyInterpreterState_GET(); - boot->tstate = _PyThreadState_New(boot->interp); + boot->tstate = _PyThreadState_New(interp); if (boot->tstate == NULL) { PyMem_Free(boot); if (!PyErr_Occurred()) { @@ -1169,7 +1376,15 @@ thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) } return NULL; } - boot->runtime = runtime; + thread_module_state *state = get_thread_state(self); + boot->module_state = state; + boot->module_thread = add_module_thread(state, &state->threads, + boot->tstate); + if (boot->module_thread == NULL) { + PyThreadState_Clear(boot->tstate); + PyMem_Free(boot); + return NULL; + } boot->func = Py_NewRef(func); boot->args = Py_NewRef(args); boot->kwargs = Py_XNewRef(kwargs); @@ -1235,12 +1450,11 @@ A subthread can use this function to interrupt the main thread.\n\ Note: the default signal handler for SIGINT raises ``KeyboardInterrupt``." ); -static lockobject *newlockobject(PyObject *module); - static PyObject * thread_PyThread_allocate_lock(PyObject *module, PyObject *Py_UNUSED(ignored)) { - return (PyObject *) newlockobject(module); + thread_module_state *state = get_thread_state(module); + return (PyObject *) newlockobject(state); } PyDoc_STRVAR(allocate_doc, @@ -1291,8 +1505,8 @@ particular thread within a system."); static PyObject * thread__count(PyObject *self, PyObject *Py_UNUSED(ignored)) { - PyInterpreterState *interp = _PyInterpreterState_GET(); - return PyLong_FromLong(interp->threads.count); + thread_module_state *state = get_thread_state(self); + return PyLong_FromLong(state->threads.num_running); } PyDoc_STRVAR(_count_doc, @@ -1308,9 +1522,8 @@ This function is meant for internal and specialized purposes only.\n\ In most applications `threading.enumerate()` should be used instead."); static void -release_sentinel(void *wr_raw) +release_sentinel(PyObject *wr) { - PyObject *wr = _PyObject_CAST(wr_raw); /* Tricky: this function is called when the current thread state is being deleted. Therefore, only simple C code can safely execute here. */ @@ -1333,30 +1546,36 @@ thread__set_sentinel(PyObject *module, PyObject *Py_UNUSED(ignored)) { PyObject *wr; PyThreadState *tstate = _PyThreadState_GET(); - lockobject *lock; + thread_module_state *state = get_thread_state(module); + + PyObject *lock; + struct module_thread *mt = module_threads_lookup(&state->threads, tstate); + if (mt == NULL) { + /* It must be the "main" thread. */ + mt = add_module_thread(state, &state->threads, tstate); + if (mt == NULL) { + return NULL; + } + } + else { + assert(mt->running_lock != NULL); + } + lock = Py_NewRef(mt->running_lock); - if (tstate->on_delete_data != NULL) { + if (mt->running_lock_weakref != NULL) { /* We must support the re-creation of the lock from a fork()ed child. */ - assert(tstate->on_delete == &release_sentinel); - wr = (PyObject *) tstate->on_delete_data; - tstate->on_delete = NULL; - tstate->on_delete_data = NULL; - Py_DECREF(wr); - } - lock = newlockobject(module); - if (lock == NULL) - return NULL; + Py_CLEAR(mt->running_lock_weakref); + } /* The lock is owned by whoever called _set_sentinel(), but the weakref hangs to the thread state. */ - wr = PyWeakref_NewRef((PyObject *) lock, NULL); + wr = PyWeakref_NewRef(lock, NULL); if (wr == NULL) { - Py_DECREF(lock); return NULL; } - tstate->on_delete_data = (void *) wr; - tstate->on_delete = &release_sentinel; - return (PyObject *) lock; + mt->running_lock_weakref = wr; + + return lock; } PyDoc_STRVAR(_set_sentinel_doc, @@ -1563,6 +1782,16 @@ PyDoc_STRVAR(excepthook_doc, \n\ Handle uncaught Thread.run() exception."); +static PyObject * +thread__after_fork(PyObject *module, PyObject *Py_UNUSED(ignored)) +{ + thread_module_state *state = get_thread_state(module); + if (module_threads_reinit(&state->threads) < 0) { + return NULL; + } + Py_RETURN_NONE; +} + static PyMethodDef thread_methods[] = { {"start_new_thread", (PyCFunction)thread_PyThread_start_new_thread, METH_VARARGS, start_new_doc}, @@ -1592,8 +1821,10 @@ static PyMethodDef thread_methods[] = { METH_VARARGS, stack_size_doc}, {"_set_sentinel", thread__set_sentinel, METH_NOARGS, _set_sentinel_doc}, - {"_excepthook", thread_excepthook, + {"_excepthook", thread_excepthook, METH_O, excepthook_doc}, + {"_after_fork", thread__after_fork, + METH_NOARGS, NULL}, {NULL, NULL} /* sentinel */ }; @@ -1609,6 +1840,11 @@ thread_module_exec(PyObject *module) // Initialize the C thread library PyThread_init_thread(); + // Initialize the list of threads owned by this module. + if (module_threads_init(&state->threads) < 0) { + return -1; + } + // Lock state->lock_type = (PyTypeObject *)PyType_FromSpec(&lock_type_spec); if (state->lock_type == NULL) { @@ -1699,6 +1935,8 @@ thread_module_clear(PyObject *module) static void thread_module_free(void *module) { + thread_module_state *state = get_thread_state(module); + module_threads_fini(&state->threads); thread_module_clear((PyObject *)module); } diff --git a/Python/pystate.c b/Python/pystate.c index 25e655a2027918..a084e1db7fff26 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -1490,10 +1490,6 @@ PyThreadState_Clear(PyThreadState *tstate) Py_CLEAR(tstate->context); - if (tstate->on_delete != NULL) { - tstate->on_delete(tstate->on_delete_data); - } - tstate->_status.cleared = 1; // XXX Call _PyThreadStateSwap(runtime, NULL) here if "current".