Skip to content

Commit a9d9778

Browse files
pitroublurb-it[bot]
authored andcommitted
pythonGH-110829: Ensure Thread.join() joins the OS thread (python#110848)
Joining a thread now ensures the underlying OS thread has exited. This is required for safer fork() in multi-threaded processes. --------- Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
1 parent b4821a4 commit a9d9778

14 files changed

+671
-98
lines changed

Include/cpython/pthread_stubs.h

+1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ PyAPI_FUNC(int) pthread_create(pthread_t *restrict thread,
8383
void *(*start_routine)(void *),
8484
void *restrict arg);
8585
PyAPI_FUNC(int) pthread_detach(pthread_t thread);
86+
PyAPI_FUNC(int) pthread_join(pthread_t thread, void** value_ptr);
8687
PyAPI_FUNC(pthread_t) pthread_self(void);
8788
PyAPI_FUNC(int) pthread_exit(void *retval) __attribute__ ((__noreturn__));
8889
PyAPI_FUNC(int) pthread_attr_init(pthread_attr_t *attr);

Include/internal/pycore_pythread.h

+42
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,48 @@ PyAPI_FUNC(PyLockStatus) PyThread_acquire_lock_timed_with_retries(
106106
PyThread_type_lock,
107107
PY_TIMEOUT_T microseconds);
108108

109+
typedef unsigned long long PyThread_ident_t;
110+
typedef Py_uintptr_t PyThread_handle_t;
111+
112+
#define PY_FORMAT_THREAD_IDENT_T "llu"
113+
#define Py_PARSE_THREAD_IDENT_T "K"
114+
115+
PyAPI_FUNC(PyThread_ident_t) PyThread_get_thread_ident_ex(void);
116+
117+
/* Thread joining APIs.
118+
*
119+
* These APIs have a strict contract:
120+
* - Either PyThread_join_thread or PyThread_detach_thread must be called
121+
* exactly once with the given handle.
122+
* - Calling neither PyThread_join_thread nor PyThread_detach_thread results
123+
* in a resource leak until the end of the process.
124+
* - Any other usage, such as calling both PyThread_join_thread and
125+
* PyThread_detach_thread, or calling them more than once (including
126+
* simultaneously), results in undefined behavior.
127+
*/
128+
PyAPI_FUNC(int) PyThread_start_joinable_thread(void (*func)(void *),
129+
void *arg,
130+
PyThread_ident_t* ident,
131+
PyThread_handle_t* handle);
132+
/*
133+
* Join a thread started with `PyThread_start_joinable_thread`.
134+
* This function cannot be interrupted. It returns 0 on success,
135+
* a non-zero value on failure.
136+
*/
137+
PyAPI_FUNC(int) PyThread_join_thread(PyThread_handle_t);
138+
/*
139+
* Detach a thread started with `PyThread_start_joinable_thread`, such
140+
* that its resources are relased as soon as it exits.
141+
* This function cannot be interrupted. It returns 0 on success,
142+
* a non-zero value on failure.
143+
*/
144+
PyAPI_FUNC(int) PyThread_detach_thread(PyThread_handle_t);
145+
146+
/*
147+
* Obtain the new thread ident and handle in a forked child process.
148+
*/
149+
PyAPI_FUNC(void) PyThread_update_thread_after_fork(PyThread_ident_t* ident,
150+
PyThread_handle_t* handle);
109151

110152
#ifdef __cplusplus
111153
}

Lib/test/_test_multiprocessing.py

+3
Original file line numberDiff line numberDiff line change
@@ -2693,6 +2693,9 @@ def test_make_pool(self):
26932693
p.join()
26942694

26952695
def test_terminate(self):
2696+
if self.TYPE == 'threads':
2697+
self.skipTest("Threads cannot be terminated")
2698+
26962699
# Simulate slow tasks which take "forever" to complete
26972700
p = self.Pool(3)
26982701
args = [support.LONG_TIMEOUT for i in range(10_000)]

Lib/test/audit-tests.py

+3
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,9 @@ def __call__(self):
455455
i = _thread.start_new_thread(test_func(), ())
456456
lock.acquire()
457457

458+
handle = _thread.start_joinable_thread(test_func())
459+
handle.join()
460+
458461

459462
def test_threading_abort():
460463
# Ensures that aborting PyThreadState_New raises the correct exception

Lib/test/test_audit.py

+2
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,8 @@ def test_threading(self):
209209
expected = [
210210
("_thread.start_new_thread", "(<test_func>, (), None)"),
211211
("test.test_func", "()"),
212+
("_thread.start_joinable_thread", "(<test_func>,)"),
213+
("test.test_func", "()"),
212214
]
213215

214216
self.assertEqual(actual, expected)

Lib/test/test_concurrent_futures/test_process_pool.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -194,11 +194,11 @@ def test_python_finalization_error(self):
194194

195195
context = self.get_context()
196196

197-
# gh-109047: Mock the threading.start_new_thread() function to inject
197+
# gh-109047: Mock the threading.start_joinable_thread() function to inject
198198
# RuntimeError: simulate the error raised during Python finalization.
199199
# Block the second creation: create _ExecutorManagerThread, but block
200200
# QueueFeederThread.
201-
orig_start_new_thread = threading._start_new_thread
201+
orig_start_new_thread = threading._start_joinable_thread
202202
nthread = 0
203203
def mock_start_new_thread(func, *args):
204204
nonlocal nthread
@@ -208,7 +208,7 @@ def mock_start_new_thread(func, *args):
208208
nthread += 1
209209
return orig_start_new_thread(func, *args)
210210

211-
with support.swap_attr(threading, '_start_new_thread',
211+
with support.swap_attr(threading, '_start_joinable_thread',
212212
mock_start_new_thread):
213213
executor = self.executor_type(max_workers=2, mp_context=context)
214214
with executor:

Lib/test/test_thread.py

+126
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,132 @@ def task():
160160
f"Exception ignored in thread started by {task!r}")
161161
self.assertIsNotNone(cm.unraisable.exc_traceback)
162162

163+
def test_join_thread(self):
164+
finished = []
165+
166+
def task():
167+
time.sleep(0.05)
168+
finished.append(thread.get_ident())
169+
170+
with threading_helper.wait_threads_exit():
171+
handle = thread.start_joinable_thread(task)
172+
handle.join()
173+
self.assertEqual(len(finished), 1)
174+
self.assertEqual(handle.ident, finished[0])
175+
176+
def test_join_thread_already_exited(self):
177+
def task():
178+
pass
179+
180+
with threading_helper.wait_threads_exit():
181+
handle = thread.start_joinable_thread(task)
182+
time.sleep(0.05)
183+
handle.join()
184+
185+
def test_join_several_times(self):
186+
def task():
187+
pass
188+
189+
with threading_helper.wait_threads_exit():
190+
handle = thread.start_joinable_thread(task)
191+
handle.join()
192+
with self.assertRaisesRegex(ValueError, "not joinable"):
193+
handle.join()
194+
195+
def test_joinable_not_joined(self):
196+
handle_destroyed = thread.allocate_lock()
197+
handle_destroyed.acquire()
198+
199+
def task():
200+
handle_destroyed.acquire()
201+
202+
with threading_helper.wait_threads_exit():
203+
handle = thread.start_joinable_thread(task)
204+
del handle
205+
handle_destroyed.release()
206+
207+
def test_join_from_self(self):
208+
errors = []
209+
handles = []
210+
start_joinable_thread_returned = thread.allocate_lock()
211+
start_joinable_thread_returned.acquire()
212+
task_tried_to_join = thread.allocate_lock()
213+
task_tried_to_join.acquire()
214+
215+
def task():
216+
start_joinable_thread_returned.acquire()
217+
try:
218+
handles[0].join()
219+
except Exception as e:
220+
errors.append(e)
221+
finally:
222+
task_tried_to_join.release()
223+
224+
with threading_helper.wait_threads_exit():
225+
handle = thread.start_joinable_thread(task)
226+
handles.append(handle)
227+
start_joinable_thread_returned.release()
228+
# Can still join after joining failed in other thread
229+
task_tried_to_join.acquire()
230+
handle.join()
231+
232+
assert len(errors) == 1
233+
with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"):
234+
raise errors[0]
235+
236+
def test_detach_from_self(self):
237+
errors = []
238+
handles = []
239+
start_joinable_thread_returned = thread.allocate_lock()
240+
start_joinable_thread_returned.acquire()
241+
thread_detached = thread.allocate_lock()
242+
thread_detached.acquire()
243+
244+
def task():
245+
start_joinable_thread_returned.acquire()
246+
try:
247+
handles[0].detach()
248+
except Exception as e:
249+
errors.append(e)
250+
finally:
251+
thread_detached.release()
252+
253+
with threading_helper.wait_threads_exit():
254+
handle = thread.start_joinable_thread(task)
255+
handles.append(handle)
256+
start_joinable_thread_returned.release()
257+
thread_detached.acquire()
258+
with self.assertRaisesRegex(ValueError, "not joinable"):
259+
handle.join()
260+
261+
assert len(errors) == 0
262+
263+
def test_detach_then_join(self):
264+
lock = thread.allocate_lock()
265+
lock.acquire()
266+
267+
def task():
268+
lock.acquire()
269+
270+
with threading_helper.wait_threads_exit():
271+
handle = thread.start_joinable_thread(task)
272+
# detach() returns even though the thread is blocked on lock
273+
handle.detach()
274+
# join() then cannot be called anymore
275+
with self.assertRaisesRegex(ValueError, "not joinable"):
276+
handle.join()
277+
lock.release()
278+
279+
def test_join_then_detach(self):
280+
def task():
281+
pass
282+
283+
with threading_helper.wait_threads_exit():
284+
handle = thread.start_joinable_thread(task)
285+
handle.join()
286+
with self.assertRaisesRegex(ValueError, "not joinable"):
287+
handle.detach()
288+
163289

164290
class Barrier:
165291
def __init__(self, num_threads):

Lib/test/test_threading.py

+44-3
Original file line numberDiff line numberDiff line change
@@ -376,16 +376,16 @@ def test_limbo_cleanup(self):
376376
# Issue 7481: Failure to start thread should cleanup the limbo map.
377377
def fail_new_thread(*args):
378378
raise threading.ThreadError()
379-
_start_new_thread = threading._start_new_thread
380-
threading._start_new_thread = fail_new_thread
379+
_start_joinable_thread = threading._start_joinable_thread
380+
threading._start_joinable_thread = fail_new_thread
381381
try:
382382
t = threading.Thread(target=lambda: None)
383383
self.assertRaises(threading.ThreadError, t.start)
384384
self.assertFalse(
385385
t in threading._limbo,
386386
"Failed to cleanup _limbo map on failure of Thread.start().")
387387
finally:
388-
threading._start_new_thread = _start_new_thread
388+
threading._start_joinable_thread = _start_joinable_thread
389389

390390
def test_finalize_running_thread(self):
391391
# Issue 1402: the PyGILState_Ensure / _Release functions may be called
@@ -482,6 +482,47 @@ def test_enumerate_after_join(self):
482482
finally:
483483
sys.setswitchinterval(old_interval)
484484

485+
def test_join_from_multiple_threads(self):
486+
# Thread.join() should be thread-safe
487+
errors = []
488+
489+
def worker():
490+
time.sleep(0.005)
491+
492+
def joiner(thread):
493+
try:
494+
thread.join()
495+
except Exception as e:
496+
errors.append(e)
497+
498+
for N in range(2, 20):
499+
threads = [threading.Thread(target=worker)]
500+
for i in range(N):
501+
threads.append(threading.Thread(target=joiner,
502+
args=(threads[0],)))
503+
for t in threads:
504+
t.start()
505+
time.sleep(0.01)
506+
for t in threads:
507+
t.join()
508+
if errors:
509+
raise errors[0]
510+
511+
def test_join_with_timeout(self):
512+
lock = _thread.allocate_lock()
513+
lock.acquire()
514+
515+
def worker():
516+
lock.acquire()
517+
518+
thread = threading.Thread(target=worker)
519+
thread.start()
520+
thread.join(timeout=0.01)
521+
assert thread.is_alive()
522+
lock.release()
523+
thread.join()
524+
assert not thread.is_alive()
525+
485526
def test_no_refcycle_through_target(self):
486527
class RunSelfFunction(object):
487528
def __init__(self, should_raise):

0 commit comments

Comments
 (0)