Skip to content

concurrent.futures.ProcessPoolExecutor pool deadlocks when submitting many tasks #105829

Closed
@kaddkaka

Description

@kaddkaka

Bug report

Submitting many tasks to a concurrent.futures.ProcessPoolExecutor pool
deadlocks with all three start methods.

When running the same example with multiprocessing.pool.Pool we have NOT been
able to cause a deadlock.

Different set of parameters affect how likely it is to get a deadlock

  1. All start methods spawn, fork, and forkserver exhibit the deadlock
    (the examples below are with spawn method)
  2. It's possible to get a deadlock with num_processes 1-24
  3. As long as NUM_TASKS is high, TASK_DATA and TASK_SIZE can be low/removed and
    still cause a hang. (see example script)
  4. Set DO_PRINT = False for higher probability of hanging.

Example stack trace excerpts in hanged scenarios

  1. reading the queue:

    • 1 thread stuck at:
      read (libpthread-2.27.so)
      recv_bytes (multiprocessing/connection.py:221)
      get (multiprocessing/queues.py:103)
      
    • other threads stuck at:
      do_futex_wait.constprop.1 (libpthread-2.27.so)
      _multiprocessing_SemLock_acquire_impl (semaphore.c:355)
      get (multiprocessing/queues.py:102)
      
  2. writing the queue:

    • 1 thread stuck at:
      write (libpthread-2.27.so)
      send_bytes (multiprocessing/connection.py:205)
      put (multiprocessing/queues.py:377)
      
    • other threads stuck at:
      do_futex_wait.constprop.1 (libpthread-2.27.so)
      _multiprocessing_SemLock_acquire_impl (semaphore.c:355)
      put (multiprocessing/queues.py:376)
      

Example script exhibiting deadlock behavior

#!/usr/bin/env python3
""" Example that hangs with concurrent.futures.ProcessPoolExecutor """

import multiprocessing
import concurrent.futures

# Tweaking parameters
NUM_TASKS = 500000
TASK_DATA = 1
TASK_SIZE = 1
DO_PRINT = True  # Set to false for almost guaranteed hang
START_METHOD = "spawn"  # Does not seem to matter
NUM_PROCESSES = 4  # multiprocessing.cpu_count()


def main():
    print("Starting pool")
    ctx = multiprocessing.get_context(START_METHOD)
    with concurrent.futures.ProcessPoolExecutor(max_workers=NUM_PROCESSES,
                                                mp_context=ctx) as pool:
        future_results = submit_to_pool(pool)
        print("Collecting results")
        assert False  # Never reached
        collect_results(future_results)


def collect_results(future_results):
    return [r.result() for r in future_results]


def submit_to_pool(pool):
    future_results = []
    for task_idx in range(NUM_TASKS):
        if DO_PRINT and task_idx % 20000 == 0:
            # Too much printing here makes the hang to go away!!!
            print("\nsubmit", task_idx)

        task_name = f"task{task_idx}" * TASK_DATA
        future_results.append(pool.submit(task, task_idx, task_name))

    return future_results


def task(task_idx, task_name):
    """ Do some dummy work """
    s = ""
    for i in range(TASK_SIZE):
        s += str(i)

    if DO_PRINT:
        # Too much printing here makes the hang to go away!!!
        print(".", end="", flush=True)


if __name__ == "__main__":
    main()

Environment

  • My environment:
    • Ubuntu 18.04.6 LTS (bionic)
    • Python 3.10.5
  • My colleagues environment:
    • Ubuntu 22.04.2 LTS (jammy)
    • Either:
      • Python 3.10.5
      • Python 3.11.0rc1

Details

Detailed stack traces in comments.

Linked PRs

Metadata

Metadata

Assignees

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions