@@ -179,14 +179,16 @@ class InteractiveStepExecutor(ExecutorBase):
179179
180180 def __init__ (
181181 self ,
182- max_cores : int = 1 ,
182+ max_cores : Optional [int ] = None ,
183+ max_workers : Optional [int ] = None ,
183184 executor_kwargs : dict = {},
184185 spawner : BaseSpawner = MpiExecSpawner ,
185186 ):
186187 super ().__init__ (max_cores = executor_kwargs .get ("max_cores" , None ))
187188 executor_kwargs ["future_queue" ] = self ._future_queue
188189 executor_kwargs ["spawner" ] = spawner
189190 executor_kwargs ["max_cores" ] = max_cores
191+ executor_kwargs ["max_workers" ] = max_workers
190192 self ._set_process (
191193 RaisingThread (
192194 target = execute_separate_tasks ,
@@ -256,7 +258,8 @@ def execute_parallel_tasks(
256258def execute_separate_tasks (
257259 future_queue : queue .Queue ,
258260 spawner : BaseSpawner = MpiExecSpawner ,
259- max_cores : int = 1 ,
261+ max_cores : Optional [int ] = None ,
262+ max_workers : Optional [int ] = None ,
260263 hostname_localhost : Optional [bool ] = None ,
261264 ** kwargs ,
262265):
@@ -267,6 +270,9 @@ def execute_separate_tasks(
267270 future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
268271 spawner (BaseSpawner): Interface to start process on selected compute resources
269272 max_cores (int): defines the number cores which can be used in parallel
273+ max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
274+ cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
275+ recommended, as computers have a limited number of compute cores.
270276 hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
271277 context of an HPC cluster this essential to be able to communicate to an
272278 Executor running on a different compute node within the same allocation. And
@@ -296,6 +302,7 @@ def execute_separate_tasks(
296302 spawner = spawner ,
297303 executor_kwargs = kwargs ,
298304 max_cores = max_cores ,
305+ max_workers = max_workers ,
299306 hostname_localhost = hostname_localhost ,
300307 )
301308 qtask_lst .append (qtask )
@@ -389,7 +396,10 @@ def _get_backend_path(
389396
390397
391398def _wait_for_free_slots (
392- active_task_dict : dict , cores_requested : int , max_cores : int
399+ active_task_dict : dict ,
400+ cores_requested : int ,
401+ max_cores : Optional [int ] = None ,
402+ max_workers : Optional [int ] = None ,
393403) -> dict :
394404 """
395405 Wait for available computing resources to become available.
@@ -398,12 +408,23 @@ def _wait_for_free_slots(
398408 active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
399409 cores_requested (int): Number of cores required for executing the next task
400410 max_cores (int): Maximum number cores which can be used
411+ max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
412+ cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
413+ recommended, as computers have a limited number of compute cores.
401414
402415 Returns:
403416 dict: Dictionary containing the future objects and the number of cores they require
404417 """
405- while sum (active_task_dict .values ()) + cores_requested > max_cores :
406- active_task_dict = {k : v for k , v in active_task_dict .items () if not k .done ()}
418+ if max_cores is not None :
419+ while sum (active_task_dict .values ()) + cores_requested > max_cores :
420+ active_task_dict = {
421+ k : v for k , v in active_task_dict .items () if not k .done ()
422+ }
423+ elif max_workers is not None and max_cores is None :
424+ while len (active_task_dict .values ()) + 1 > max_workers :
425+ active_task_dict = {
426+ k : v for k , v in active_task_dict .items () if not k .done ()
427+ }
407428 return active_task_dict
408429
409430
@@ -490,7 +511,8 @@ def _submit_function_to_separate_process(
490511 qtask : queue .Queue ,
491512 spawner : BaseSpawner ,
492513 executor_kwargs : dict ,
493- max_cores : int = 1 ,
514+ max_cores : Optional [int ] = None ,
515+ max_workers : Optional [int ] = None ,
494516 hostname_localhost : Optional [bool ] = None ,
495517):
496518 """
@@ -503,6 +525,9 @@ def _submit_function_to_separate_process(
503525 spawner (BaseSpawner): Interface to start process on selected compute resources
504526 executor_kwargs (dict): keyword parameters used to initialize the Executor
505527 max_cores (int): defines the number cores which can be used in parallel
528+ max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
529+ cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
530+ recommended, as computers have a limited number of compute cores.
506531 hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
507532 context of an HPC cluster this essential to be able to communicate to an
508533 Executor running on a different compute node within the same allocation. And
@@ -525,6 +550,7 @@ def _submit_function_to_separate_process(
525550 active_task_dict = active_task_dict ,
526551 cores_requested = resource_dict ["cores" ],
527552 max_cores = max_cores ,
553+ max_workers = max_workers ,
528554 )
529555 active_task_dict [task_dict ["future" ]] = resource_dict ["cores" ]
530556 task_kwargs = executor_kwargs .copy ()
0 commit comments