Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def calc(i):
return i, size, rank

with flux.job.FluxExecutor() as flux_exe:
with Executor(max_cores=2, cores_per_worker=2, executor=flux_exe) as exe:
with Executor(max_cores=2, executor=flux_exe, resource_dict={"cores": 2}) as exe:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Update the explanatory text to match the new API.

The example code has been updated to use resource_dict={"cores": 2}, but the explanatory text below still references the old cores_per_worker=2 parameter. This inconsistency could confuse users.

Consider updating the paragraph that starts with "The interface of the standard..." to reflect the new API:

-The interface of the standard [concurrent.futures.Executor](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures) 
-is extended by adding the option `cores_per_worker=2` to assign multiple MPI ranks to each function call.
+The interface of the standard [concurrent.futures.Executor](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures) 
+is extended by adding the option `resource_dict={"cores": 2}` to assign multiple MPI ranks to each function call.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
with Executor(max_cores=2, executor=flux_exe, resource_dict={"cores": 2}) as exe:
The interface of the standard [concurrent.futures.Executor](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)
is extended by adding the option `resource_dict={"cores": 2}` to assign multiple MPI ranks to each function call.

fs = exe.submit(calc, 3)
print(fs.result())
```
Expand Down
70 changes: 34 additions & 36 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ class Executor:
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
max_cores (int): defines the number cores which can be used in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_worker (int): number of GPUs per worker - defaults to 0
cwd (str/None): current working directory where the parallel python task is executed
openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and SLURM only) - default False
slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores_per_worker (int): number of MPI cores to be used for each function call
- threads_per_core (int): number of OpenMP threads to be used for each function call
- gpus_per_worker (int): number of GPUs per worker - defaults to 0
- cwd (str/None): current working directory where the parallel python task is executed
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
SLURM only) - default False
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
Expand Down Expand Up @@ -89,12 +91,7 @@ def __init__(
backend: str = "local",
cache_directory: Optional[str] = None,
max_cores: int = 1,
cores_per_worker: int = 1,
threads_per_core: int = 1,
gpus_per_worker: int = 0,
cwd: Optional[str] = None,
openmpi_oversubscribe: bool = False,
slurm_cmd_args: list[str] = [],
resource_dict: Optional[dict] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
Expand All @@ -114,12 +111,7 @@ def __new__(
backend: str = "local",
cache_directory: Optional[str] = None,
max_cores: int = 1,
cores_per_worker: int = 1,
threads_per_core: int = 1,
gpus_per_worker: int = 0,
cwd: Optional[str] = None,
openmpi_oversubscribe: bool = False,
slurm_cmd_args: list[str] = [],
resource_dict: Optional[dict] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
Expand All @@ -145,12 +137,15 @@ def __new__(
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
max_cores (int): defines the number cores which can be used in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_worker (int): number of GPUs per worker - defaults to 0
openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and SLURM only) - default False
slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
cwd (str/None): current working directory where the parallel python task is executed
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores_per_worker (int): number of MPI cores to be used for each function call
- threads_per_core (int): number of OpenMP threads to be used for each function call
- gpus_per_worker (int): number of GPUs per worker - defaults to 0
- cwd (str/None): current working directory where the parallel python task is executed
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI
and SLURM only) - default False
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
only)
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
Expand All @@ -172,18 +167,26 @@ def __new__(
debugging purposes and to get an overview of the specified dependencies.

"""
default_resource_dict = {
"cores": 1,
"threads_per_core": 1,
"gpus_per_core": 0,
"cwd": None,
"openmpi_oversubscribe": False,
"slurm_cmd_args": [],
}
if resource_dict is None:
resource_dict = {}
resource_dict.update(
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
)
Comment on lines +178 to +182
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add validation for resource_dict values.

The code accepts arbitrary values without validation, which could lead to resource exhaustion or other security issues. Consider adding validation for:

  • Negative values for cores, threads, and GPUs
  • Unreasonably large values that could impact system stability
  • Type checking for numeric parameters

Example implementation:

def validate_resource_dict(resource_dict):
    for key in ['cores_per_worker', 'threads_per_core', 'gpus_per_worker']:
        if key in resource_dict:
            value = resource_dict[key]
            if not isinstance(value, int):
                raise TypeError(f"{key} must be an integer")
            if value < 0:
                raise ValueError(f"{key} cannot be negative")
            if value > MAX_ALLOWED_VALUE:  # Define appropriate limits
                raise ValueError(f"{key} exceeds maximum allowed value")

Would you like me to provide a complete implementation with appropriate resource limits?

Comment on lines +170 to +182
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix inconsistent parameter naming in default_resource_dict.

The keys in default_resource_dict don't match the documented parameter names:

  • "cores" vs documented "cores_per_worker"
  • "gpus_per_core" vs documented "gpus_per_worker"

Apply this fix:

 default_resource_dict = {
-    "cores": 1,
-    "gpus_per_core": 0,
+    "cores_per_worker": 1,
+    "gpus_per_worker": 0,
     "threads_per_core": 1,
     "cwd": None,
     "openmpi_oversubscribe": False,
     "slurm_cmd_args": [],
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
default_resource_dict = {
"cores": 1,
"threads_per_core": 1,
"gpus_per_core": 0,
"cwd": None,
"openmpi_oversubscribe": False,
"slurm_cmd_args": [],
}
if resource_dict is None:
resource_dict = {}
resource_dict.update(
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
)
default_resource_dict = {
"cores_per_worker": 1,
"threads_per_core": 1,
"gpus_per_worker": 0,
"cwd": None,
"openmpi_oversubscribe": False,
"slurm_cmd_args": [],
}
if resource_dict is None:
resource_dict = {}
resource_dict.update(
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
)

if not disable_dependencies:
return ExecutorWithDependencies(
max_workers=max_workers,
backend=backend,
cache_directory=cache_directory,
max_cores=max_cores,
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
cwd=cwd,
openmpi_oversubscribe=openmpi_oversubscribe,
slurm_cmd_args=slurm_cmd_args,
resource_dict=resource_dict,
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
Expand All @@ -201,12 +204,7 @@ def __new__(
backend=backend,
cache_directory=cache_directory,
max_cores=max_cores,
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
cwd=cwd,
openmpi_oversubscribe=openmpi_oversubscribe,
slurm_cmd_args=slurm_cmd_args,
resource_dict=resource_dict,
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
Expand Down
80 changes: 38 additions & 42 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,7 @@ def create_executor(
backend: str = "local",
max_cores: int = 1,
cache_directory: Optional[str] = None,
cores_per_worker: int = 1,
threads_per_core: int = 1,
gpus_per_worker: int = 0,
cwd: Optional[str] = None,
openmpi_oversubscribe: bool = False,
slurm_cmd_args: list[str] = [],
resource_dict: Optional[dict] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
Expand All @@ -179,12 +174,14 @@ def create_executor(
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
max_cores (int): defines the number cores which can be used in parallel
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_worker (int): number of GPUs per worker - defaults to 0
cwd (str/None): current working directory where the parallel python task is executed
openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and SLURM only) - default False
slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores_per_worker (int): number of MPI cores to be used for each function call
- threads_per_core (int): number of OpenMP threads to be used for each function call
- gpus_per_worker (int): number of GPUs per worker - defaults to 0
- cwd (str/None): current working directory where the parallel python task is executed
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
SLURM only) - default False
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
Comment on lines +177 to +184
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Inconsistent key names in resource_dict

The keys used in resource_dict are inconsistent between the documentation, default values, and usage in the code:

  • In the docstring (lines 177-184), keys like cores_per_worker and gpus_per_worker are documented.
  • In default_resource_dict (lines 205-211), the keys are cores and gpus_per_core.
  • In usage (lines 218-219, 263-264), keys like cores_per_worker and gpus_per_worker are expected.

This inconsistency can lead to confusion and potential bugs since the keys may not match as intended.

To resolve this inconsistency, align the key names across the docstring, default values, and code usage. Update default_resource_dict and related code to use consistent key names:

 default_resource_dict = {
-    "cores": 1,
+    "cores_per_worker": 1,
     "threads_per_core": 1,
-    "gpus_per_core": 0,
+    "gpus_per_worker": 0,
     "cwd": None,
     "openmpi_oversubscribe": False,
     "slurm_cmd_args": [],
 }

Update the usage in the code accordingly:

-cores_per_worker = resource_dict["cores"]
+cores_per_worker = resource_dict["cores_per_worker"]

...

-check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_worker"])
+check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_worker"])

Ensure that all references to these keys are updated for consistency.

Also applies to: 205-211, 218-219, 263-264

flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
Expand All @@ -205,70 +202,69 @@ def create_executor(
if flux_executor is not None and backend != "flux":
backend = "flux"
check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
executor_kwargs = {
"cores": cores_per_worker,
"hostname_localhost": hostname_localhost,
"cwd": cwd,
"cache_directory": cache_directory,
}
cores_per_worker = resource_dict["cores"]
resource_dict["cache_directory"] = cache_directory
resource_dict["hostname_localhost"] = hostname_localhost
if backend == "flux":
check_oversubscribe(oversubscribe=openmpi_oversubscribe)
check_command_line_argument_lst(command_line_argument_lst=slurm_cmd_args)
executor_kwargs["threads_per_core"] = threads_per_core
executor_kwargs["gpus_per_core"] = int(gpus_per_worker / cores_per_worker)
executor_kwargs["flux_executor"] = flux_executor
executor_kwargs["flux_executor_pmi_mode"] = flux_executor_pmi_mode
executor_kwargs["flux_executor_nesting"] = flux_executor_nesting
check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
check_command_line_argument_lst(
command_line_argument_lst=resource_dict["slurm_cmd_args"]
)
del resource_dict["openmpi_oversubscribe"]
del resource_dict["slurm_cmd_args"]
resource_dict["flux_executor"] = flux_executor
resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
resource_dict["flux_executor_nesting"] = flux_executor_nesting
if block_allocation:
executor_kwargs["init_function"] = init_function
resource_dict["init_function"] = init_function
return InteractiveExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
executor_kwargs=resource_dict,
spawner=FluxPythonSpawner,
)
else:
return InteractiveStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
executor_kwargs=resource_dict,
spawner=FluxPythonSpawner,
)
elif backend == "slurm":
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
executor_kwargs["threads_per_core"] = threads_per_core
executor_kwargs["gpus_per_core"] = int(gpus_per_worker / cores_per_worker)
executor_kwargs["slurm_cmd_args"] = slurm_cmd_args
executor_kwargs["openmpi_oversubscribe"] = openmpi_oversubscribe
if block_allocation:
executor_kwargs["init_function"] = init_function
resource_dict["init_function"] = init_function
return InteractiveExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
executor_kwargs=resource_dict,
spawner=SrunSpawner,
)
else:
return InteractiveStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
executor_kwargs=resource_dict,
spawner=SrunSpawner,
)
else: # backend="local"
check_threads_per_core(threads_per_core=threads_per_core)
check_gpus_per_worker(gpus_per_worker=gpus_per_worker)
check_command_line_argument_lst(command_line_argument_lst=slurm_cmd_args)
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
executor_kwargs["openmpi_oversubscribe"] = openmpi_oversubscribe
check_threads_per_core(threads_per_core=resource_dict["threads_per_core"])
check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incorrect key used for gpus_per_worker in local backend

In the call to check_gpus_per_worker, gpus_per_worker=resource_dict["gpus_per_core"] is passed. However, the key should be gpus_per_worker to align with the expected parameter.

Apply this diff to correct the key:

-check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
+check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_worker"])
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_worker"])

check_command_line_argument_lst(
command_line_argument_lst=resource_dict["slurm_cmd_args"]
)
del resource_dict["threads_per_core"]
del resource_dict["gpus_per_core"]
del resource_dict["slurm_cmd_args"]
if block_allocation:
executor_kwargs["init_function"] = init_function
resource_dict["init_function"] = init_function
return InteractiveExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
executor_kwargs=resource_dict,
spawner=MpiExecSpawner,
)
else:
return InteractiveStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
executor_kwargs=resource_dict,
spawner=MpiExecSpawner,
)
Loading
Loading