Skip to content

GH-109978: Allow multiprocessing finalizers to run on a separate thread #110510

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,9 +561,9 @@ def shutdown_workers(self):
except queue.Full:
break

def join_executor_internals(self):
def join_executor_internals(self, broken=False):
with self.shutdown_lock:
self._join_executor_internals()
self._join_executor_internals(broken)

def _join_executor_internals(self, broken=False):
# If broken, call_queue was closed and so can no longer be used.
Expand Down Expand Up @@ -759,7 +759,11 @@ def _start_executor_manager_thread(self):
if not self._safe_to_dynamically_spawn_children: # ie, using fork.
self._launch_processes()
self._executor_manager_thread = _ExecutorManagerThread(self)
self._executor_manager_thread.start()
try:
self._executor_manager_thread.start()
except RuntimeError:
self._broken = "Executor manager thread could not be started"
raise BrokenProcessPool(self._broken)
_threads_wakeups[self._executor_manager_thread] = \
self._executor_manager_thread_wakeup

Expand Down Expand Up @@ -860,7 +864,10 @@ def shutdown(self, wait=True, *, cancel_futures=False):
self._executor_manager_thread_wakeup.wakeup()

if self._executor_manager_thread is not None and wait:
self._executor_manager_thread.join()
try:
self._executor_manager_thread.join()
except RuntimeError:
self._executor_manager_thread.join_executor_internals(broken=True)
# To reduce the risk of opening too many files, remove references to
# objects that use file descriptors.
self._executor_manager_thread = None
Expand Down
4 changes: 4 additions & 0 deletions Lib/multiprocessing/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,10 @@ def close(self):
self._listener = None
listener.close()

@property
def closed(self):
return self._listener is None

@property
def address(self):
return self._listener._address
Expand Down
43 changes: 6 additions & 37 deletions Lib/multiprocessing/heap.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,6 @@ def __init__(self, size=mmap.PAGESIZE):
self._allocated_blocks = defaultdict(set)
self._arenas = []

# List of pending blocks to free - see comment in free() below
self._pending_free_blocks = []

# Statistics
self._n_mallocs = 0
self._n_frees = 0
Expand Down Expand Up @@ -255,43 +252,16 @@ def _remove_allocated_block(self, block):
# Arena is entirely free, discard it from this process
self._discard_arena(arena)

def _free_pending_blocks(self):
# Free all the blocks in the pending list - called with the lock held.
while True:
try:
block = self._pending_free_blocks.pop()
except IndexError:
break
self._add_free_block(block)
self._remove_allocated_block(block)

def free(self, block):
# free a block returned by malloc()
# Since free() can be called asynchronously by the GC, it could happen
# that it's called while self._lock is held: in that case,
# self._lock.acquire() would deadlock (issue #12352). To avoid that, a
# trylock is used instead, and if the lock can't be acquired
# immediately, the block is added to a list of blocks to be freed
# synchronously sometimes later from malloc() or free(), by calling
# _free_pending_blocks() (appending and retrieving from a list is not
# strictly thread-safe but under CPython it's atomic thanks to the GIL).
if os.getpid() != self._lastpid:
raise ValueError(
"My pid ({0:n}) is not last pid {1:n}".format(
os.getpid(),self._lastpid))
if not self._lock.acquire(False):
# can't acquire the lock right now, add the block to the list of
# pending blocks to free
self._pending_free_blocks.append(block)
else:
# we hold the lock
try:
self._n_frees += 1
self._free_pending_blocks()
self._add_free_block(block)
self._remove_allocated_block(block)
finally:
self._lock.release()
with self._lock:
self._n_frees += 1
self._add_free_block(block)
self._remove_allocated_block(block)

def malloc(self, size):
# return a block of right size (possibly rounded up)
Expand All @@ -303,8 +273,6 @@ def malloc(self, size):
self.__init__() # reinitialize after fork
with self._lock:
self._n_mallocs += 1
# allow pending blocks to be marked available
self._free_pending_blocks()
size = self._roundup(max(size, 1), self._alignment)
(arena, start, stop) = self._malloc(size)
real_stop = start + size
Expand All @@ -330,7 +298,8 @@ def __init__(self, size):
raise OverflowError("Size {0:n} too large".format(size))
block = BufferWrapper._heap.malloc(size)
self._state = (block, size)
util.Finalize(self, BufferWrapper._heap.free, args=(block,))
util.Finalize(self, BufferWrapper._heap.free, args=(block,),
reentrant=False)

def create_memoryview(self):
(arena, start, stop), size = self._state
Expand Down
8 changes: 4 additions & 4 deletions Lib/multiprocessing/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,15 @@ def serve_forever(self):
except (KeyboardInterrupt, SystemExit):
pass
finally:
self.listener.close()
if sys.stdout != sys.__stdout__: # what about stderr?
util.debug('resetting stdout, stderr')
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
sys.exit(0)

def accepter(self):
while True:
while not self.listener.closed:
try:
c = self.listener.accept()
except OSError:
Expand Down Expand Up @@ -575,7 +576,7 @@ def start(self, initializer=None, initargs=()):
self, type(self)._finalize_manager,
args=(self._process, self._address, self._authkey, self._state,
self._Client, self._shutdown_timeout),
exitpriority=0
exitpriority=0, reentrant=False
)

@classmethod
Expand Down Expand Up @@ -859,12 +860,11 @@ def _incref(self):
self._idset.add(self._id)

state = self._manager and self._manager._state

self._close = util.Finalize(
self, BaseProxy._decref,
args=(self._token, self._authkey, state,
self._tls, self._idset, self._Client),
exitpriority=10
exitpriority=10, reentrant=False
)

@staticmethod
Expand Down
7 changes: 4 additions & 3 deletions Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ def __init__(self, processes=None, initializer=None, initargs=(),
p.terminate()
for p in self._pool:
p.join()
self._pool.clear()
raise

sentinels = self._get_sentinels()
Expand Down Expand Up @@ -257,7 +258,7 @@ def __init__(self, processes=None, initializer=None, initargs=(),
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
self._change_notifier, self._worker_handler, self._task_handler,
self._result_handler, self._cache),
exitpriority=15
exitpriority=15, reentrant=False
)
self._state = RUN

Expand Down Expand Up @@ -665,8 +666,8 @@ def join(self):
self._worker_handler.join()
self._task_handler.join()
self._result_handler.join()
for p in self._pool:
p.join()
while self._pool:
self._pool.pop().join()
Comment on lines +669 to +670
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference?


@staticmethod
def _help_stuff_finish(inqueue, task_handler, size):
Expand Down
4 changes: 2 additions & 2 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,14 @@ def _start_thread(self):
self._jointhread = Finalize(
self._thread, Queue._finalize_join,
[weakref.ref(self._thread)],
exitpriority=-5
exitpriority=-5, reentrant=False
)

# Send sentinel to the thread queue object when garbage collected
self._close = Finalize(
self, Queue._finalize_close,
[self._buffer, self._notempty],
exitpriority=10
exitpriority=10, reentrant=False
)

@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion Lib/multiprocessing/synchronize.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def _after_fork(obj):
from .resource_tracker import register
register(self._semlock.name, "semaphore")
util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
exitpriority=0)
exitpriority=0, reentrant=False)

@staticmethod
def _cleanup(name):
Expand Down
Loading