diff --git a/src/mdio/segy/parsers.py b/src/mdio/segy/parsers.py index 2b13186e..39493d07 100644 --- a/src/mdio/segy/parsers.py +++ b/src/mdio/segy/parsers.py @@ -2,6 +2,7 @@ from __future__ import annotations +import multiprocessing as mp import os from concurrent.futures import ProcessPoolExecutor from itertools import repeat @@ -46,15 +47,18 @@ def parse_index_headers( trace_ranges = [] for idx in range(n_blocks): start, stop = idx * block_size, (idx + 1) * block_size - if stop > trace_count: - stop = trace_count + stop = min(stop, trace_count) trace_ranges.append((start, stop)) + # For Unix async reads with s3fs/fsspec & multiprocessing, + # use 'spawn' instead of default 'fork' to avoid deadlocks + # on cloud stores. Slower but necessary. Default on Windows. num_workers = min(n_blocks, NUM_CPUS) + context = mp.get_context("spawn") tqdm_kw = dict(unit="block", dynamic_ncols=True) - with ProcessPoolExecutor(num_workers) as executor: + with ProcessPoolExecutor(num_workers, mp_context=context) as executor: # pool.imap is lazy lazy_work = executor.map( header_scan_worker, # fn