Skip to content

Commit ef411ef

Browse files
committed
GH-110829: Ensure Thread.join() joins the OS thread
1 parent b2ab210 commit ef411ef

File tree

7 files changed

+369
-23
lines changed

7 files changed

+369
-23
lines changed

Include/pythread.h

+6
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ PyAPI_FUNC(unsigned long) PyThread_start_new_thread(void (*)(void *), void *);
2020
PyAPI_FUNC(void) _Py_NO_RETURN PyThread_exit_thread(void);
2121
PyAPI_FUNC(unsigned long) PyThread_get_thread_ident(void);
2222

23+
PyAPI_FUNC(unsigned long) PyThread_start_joinable_thread(void (*func)(void *),
24+
void *arg,
25+
Py_uintptr_t* handle);
26+
PyAPI_FUNC(int) PyThread_join_thread(Py_uintptr_t);
27+
PyAPI_FUNC(int) PyThread_detach_thread(Py_uintptr_t);
28+
2329
#if (defined(__APPLE__) || defined(__linux__) || defined(_WIN32) \
2430
|| defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) \
2531
|| defined(__DragonFly__) || defined(_AIX))

Lib/test/_test_multiprocessing.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -2578,7 +2578,7 @@ def test_async(self):
25782578
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
25792579

25802580
def test_async_timeout(self):
2581-
res = self.pool.apply_async(sqr, (6, TIMEOUT2 + support.SHORT_TIMEOUT))
2581+
res = self.pool.apply_async(sqr, (6, 5 * TIMEOUT2))
25822582
get = TimingWrapper(res.get)
25832583
self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
25842584
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
@@ -2682,6 +2682,9 @@ def test_make_pool(self):
26822682
p.join()
26832683

26842684
def test_terminate(self):
2685+
if self.TYPE == 'threads':
2686+
self.skipTest("Threads cannot be terminated")
2687+
26852688
# Simulate slow tasks which take "forever" to complete
26862689
args = [support.LONG_TIMEOUT for i in range(10_000)]
26872690
result = self.pool.map_async(time.sleep, args, chunksize=1)

Lib/test/test_thread.py

+92
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,98 @@ def task():
160160
"Exception ignored in thread started by")
161161
self.assertIsNotNone(cm.unraisable.exc_traceback)
162162

163+
def test_join_thread(self):
164+
finished = []
165+
166+
def task():
167+
time.sleep(0.05)
168+
finished.append(None)
169+
170+
with threading_helper.wait_threads_exit():
171+
joinable = True
172+
ident = thread.start_new_thread(task, (), {}, joinable)
173+
thread.join_thread(ident)
174+
self.assertEqual(len(finished), 1)
175+
176+
def test_join_thread_already_exited(self):
177+
def task():
178+
pass
179+
180+
with threading_helper.wait_threads_exit():
181+
joinable = True
182+
ident = thread.start_new_thread(task, (), {}, joinable)
183+
time.sleep(0.05)
184+
thread.join_thread(ident)
185+
186+
def test_join_non_joinable(self):
187+
def task():
188+
pass
189+
190+
with threading_helper.wait_threads_exit():
191+
ident = thread.start_new_thread(task, ())
192+
with self.assertRaisesRegex(ValueError, "not joinable"):
193+
thread.join_thread(ident)
194+
195+
def test_join_several_times(self):
196+
def task():
197+
pass
198+
199+
with threading_helper.wait_threads_exit():
200+
joinable = True
201+
ident = thread.start_new_thread(task, (), {}, joinable)
202+
thread.join_thread(ident)
203+
with self.assertRaisesRegex(ValueError, "not joinable"):
204+
thread.join_thread(ident)
205+
206+
def test_join_from_self(self):
207+
errors = []
208+
209+
def task():
210+
ident = thread.get_ident()
211+
try:
212+
thread.join_thread(ident)
213+
except Exception as e:
214+
errors.append(e)
215+
216+
with threading_helper.wait_threads_exit():
217+
joinable = True
218+
ident = thread.start_new_thread(task, (), {}, joinable)
219+
time.sleep(0.05)
220+
# Can still join after join_thread() failed in other thread
221+
thread.join_thread(ident)
222+
223+
assert len(errors) == 1
224+
with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"):
225+
raise errors[0]
226+
227+
def test_detach_then_join(self):
228+
lock = thread.allocate_lock()
229+
lock.acquire()
230+
231+
def task():
232+
lock.acquire()
233+
234+
with threading_helper.wait_threads_exit():
235+
joinable = True
236+
ident = thread.start_new_thread(task, (), {}, joinable)
237+
# detach_thread() returns even though the thread is blocked on lock
238+
thread.detach_thread(ident)
239+
# join_thread() then cannot be called anymore
240+
with self.assertRaisesRegex(ValueError, "not joinable"):
241+
thread.join_thread(ident)
242+
lock.release()
243+
244+
def test_join_then_detach(self):
245+
def task():
246+
pass
247+
248+
with threading_helper.wait_threads_exit():
249+
joinable = True
250+
ident = thread.start_new_thread(task, (), {}, joinable)
251+
thread.join_thread(ident)
252+
with self.assertRaisesRegex(ValueError, "not joinable"):
253+
thread.detach_thread(ident)
254+
163255

164256
class Barrier:
165257
def __init__(self, num_threads):

Lib/threading.py

+33-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import _thread
66
import functools
77
import warnings
8+
import _weakref
89

910
from time import monotonic as _time
1011
from _weakrefset import WeakSet
@@ -34,6 +35,8 @@
3435

3536
# Rename some stuff so "from threading import *" is safe
3637
_start_new_thread = _thread.start_new_thread
38+
_join_thread = _thread.join_thread
39+
_detach_thread = _thread.detach_thread
3740
_daemon_threads_allowed = _thread.daemon_threads_allowed
3841
_allocate_lock = _thread.allocate_lock
3942
_set_sentinel = _thread._set_sentinel
@@ -924,6 +927,7 @@ class is implemented.
924927
if _HAVE_THREAD_NATIVE_ID:
925928
self._native_id = None
926929
self._tstate_lock = None
930+
self._join_lock = None
927931
self._started = Event()
928932
self._is_stopped = False
929933
self._initialized = True
@@ -944,11 +948,14 @@ def _reset_internal_locks(self, is_alive):
944948
if self._tstate_lock is not None:
945949
self._tstate_lock._at_fork_reinit()
946950
self._tstate_lock.acquire()
951+
if self._join_lock is not None:
952+
self._join_lock._at_fork_reinit()
947953
else:
948954
# The thread isn't alive after fork: it doesn't have a tstate
949955
# anymore.
950956
self._is_stopped = True
951957
self._tstate_lock = None
958+
self._join_lock = None
952959

953960
def __repr__(self):
954961
assert self._initialized, "Thread.__init__() was not called"
@@ -980,15 +987,24 @@ def start(self):
980987
if self._started.is_set():
981988
raise RuntimeError("threads can only be started once")
982989

990+
self._join_lock = _allocate_lock()
991+
983992
with _active_limbo_lock:
984993
_limbo[self] = self
985994
try:
986-
_start_new_thread(self._bootstrap, ())
995+
# Start joinable thread
996+
_start_new_thread(self._bootstrap, (), {}, True)
987997
except Exception:
988998
with _active_limbo_lock:
989999
del _limbo[self]
9901000
raise
991-
self._started.wait()
1001+
self._started.wait() # Will set ident and native_id
1002+
1003+
# We need to make sure the OS thread is either explicitly joined or
1004+
# detached at some point, otherwise system resources can be leaked.
1005+
def _finalizer(wr, _detach_thread=_detach_thread, ident=self._ident):
1006+
_detach_thread(ident)
1007+
self._non_joined_finalizer = _weakref.ref(self, _finalizer)
9921008

9931009
def run(self):
9941010
"""Method representing the thread's activity.
@@ -1144,6 +1160,19 @@ def join(self, timeout=None):
11441160
# historically .join(timeout=x) for x<0 has acted as if timeout=0
11451161
self._wait_for_tstate_lock(timeout=max(timeout, 0))
11461162

1163+
if self._is_stopped:
1164+
self._join_os_thread()
1165+
1166+
def _join_os_thread(self):
1167+
join_lock = self._join_lock
1168+
if join_lock is not None:
1169+
# Calling join() multiple times simultaneously would result in early
1170+
# return for one of the callers.
1171+
with join_lock:
1172+
_join_thread(self._ident)
1173+
self._join_lock = None
1174+
self._non_joined_finalizer = None
1175+
11471176
def _wait_for_tstate_lock(self, block=True, timeout=-1):
11481177
# Issue #18808: wait for the thread state to be gone.
11491178
# At the end of the thread's life, after all knowledge of the thread
@@ -1223,6 +1252,8 @@ def is_alive(self):
12231252
if self._is_stopped or not self._started.is_set():
12241253
return False
12251254
self._wait_for_tstate_lock(False)
1255+
if self._is_stopped:
1256+
self._join_os_thread()
12261257
return not self._is_stopped
12271258

12281259
@property

0 commit comments

Comments
 (0)