Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ jobs:
- uses: actions/checkout@v4
- uses: koesterlab/setup-slurm-action@v1
timeout-minutes: 5
- name: ubnuntu install
run: sudo apt install -y mpich
- name: Conda config
shell: bash -l {0}
run: echo -e "channels:\n - conda-forge\n" > .condarc
Expand All @@ -295,8 +297,10 @@ jobs:
run: |
pip install . --no-deps --no-build-isolation
cd tests
python -m unittest test_slurmclusterexecutor.py
sinfo -o "%n %e %m %a %c %C"
srun --mpi=list
python -m unittest test_slurmjobexecutor.py
python -m unittest test_slurmclusterexecutor.py

unittest_mpich:
needs: [black]
Expand Down
2 changes: 1 addition & 1 deletion docs/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ For the version 5 of openmpi the backend changed to `pmix`, this requires the ad
```
conda install -c conda-forge flux-core flux-sched flux-pmix openmpi>=5 executorlib
```
In addition, the `flux_executor_pmi_mode="pmix"` parameter has to be set for the `FluxJobExecutor` or the
In addition, the `pmi_mode="pmix"` parameter has to be set for the `FluxJobExecutor` or the
`FluxClusterExecutor` to switch to `pmix` as backend.

### Test Flux Framework
Expand Down
25 changes: 14 additions & 11 deletions executorlib/executor/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class FluxJobExecutor(BaseExecutor):
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
Expand Down Expand Up @@ -93,8 +93,8 @@ def __init__(
cache_directory: Optional[str] = None,
max_cores: Optional[int] = None,
resource_dict: Optional[dict] = None,
pmi_mode: Optional[str] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
hostname_localhost: Optional[bool] = None,
Expand Down Expand Up @@ -130,8 +130,8 @@ def __init__(
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
Expand Down Expand Up @@ -175,8 +175,8 @@ def __init__(
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
pmi_mode=pmi_mode,
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
flux_log_files=flux_log_files,
hostname_localhost=hostname_localhost,
Expand All @@ -199,8 +199,8 @@ def __init__(
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
pmi_mode=pmi_mode,
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
flux_log_files=flux_log_files,
hostname_localhost=hostname_localhost,
Expand Down Expand Up @@ -236,6 +236,7 @@ class FluxClusterExecutor(BaseExecutor):
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -283,6 +284,7 @@ def __init__(
max_cores: Optional[int] = None,
resource_dict: Optional[dict] = None,
pysqa_config_directory: Optional[str] = None,
pmi_mode: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[Callable] = None,
Expand Down Expand Up @@ -317,6 +319,7 @@ def __init__(
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -366,7 +369,7 @@ def __init__(
cache_directory=cache_directory,
resource_dict=resource_dict,
flux_executor=None,
flux_executor_pmi_mode=None,
pmi_mode=pmi_mode,
flux_executor_nesting=False,
flux_log_files=False,
pysqa_config_directory=pysqa_config_directory,
Expand All @@ -384,8 +387,8 @@ def __init__(
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
pmi_mode=None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Hardcoded None value in plot dependency graph path.

In the FluxClusterExecutor, when plot_dependency_graph is True, pmi_mode is hardcoded to None instead of using the constructor parameter. This may not be the intended behavior.

Apply this diff to fix the issue:

-                        pmi_mode=None,
+                        pmi_mode=pmi_mode,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pmi_mode=None,
pmi_mode=pmi_mode,
🤖 Prompt for AI Agents
In executorlib/executor/flux.py at line 390, the parameter pmi_mode is hardcoded
to None when plot_dependency_graph is True in FluxClusterExecutor. To fix this,
replace the hardcoded None with the pmi_mode value passed to the constructor so
that the actual intended mode is used instead of always None.

flux_executor=None,
flux_executor_pmi_mode=None,
flux_executor_nesting=False,
flux_log_files=False,
hostname_localhost=hostname_localhost,
Expand All @@ -405,8 +408,8 @@ def create_flux_executor(
max_cores: Optional[int] = None,
cache_directory: Optional[str] = None,
resource_dict: Optional[dict] = None,
pmi_mode: Optional[str] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
hostname_localhost: Optional[bool] = None,
Expand Down Expand Up @@ -434,8 +437,8 @@ def create_flux_executor(
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
Expand Down Expand Up @@ -467,7 +470,7 @@ def create_flux_executor(
resource_dict["hostname_localhost"] = hostname_localhost
resource_dict["log_obj_size"] = log_obj_size
check_init_function(block_allocation=block_allocation, init_function=init_function)
check_pmi(backend="flux_allocation", pmi=flux_executor_pmi_mode)
check_pmi(backend="flux_allocation", pmi=pmi_mode)
check_oversubscribe(oversubscribe=resource_dict.get("openmpi_oversubscribe", False))
check_command_line_argument_lst(
command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
Expand All @@ -476,8 +479,8 @@ def create_flux_executor(
del resource_dict["openmpi_oversubscribe"]
if "slurm_cmd_args" in resource_dict:
del resource_dict["slurm_cmd_args"]
resource_dict["pmi_mode"] = pmi_mode
resource_dict["flux_executor"] = flux_executor
resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
resource_dict["flux_executor_nesting"] = flux_executor_nesting
resource_dict["flux_log_files"] = flux_log_files
if block_allocation:
Expand Down
2 changes: 1 addition & 1 deletion executorlib/executor/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def __init__(
cache_directory=cache_directory,
resource_dict=resource_dict,
flux_executor=None,
flux_executor_pmi_mode=None,
pmi_mode=None,
flux_executor_nesting=False,
flux_log_files=False,
pysqa_config_directory=None,
Expand Down
13 changes: 12 additions & 1 deletion executorlib/executor/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class SlurmClusterExecutor(BaseExecutor):
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -91,6 +92,7 @@ def __init__(
max_cores: Optional[int] = None,
resource_dict: Optional[dict] = None,
pysqa_config_directory: Optional[str] = None,
pmi_mode: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[Callable] = None,
Expand Down Expand Up @@ -125,6 +127,7 @@ def __init__(
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -173,8 +176,8 @@ def __init__(
max_cores=max_cores,
cache_directory=cache_directory,
resource_dict=resource_dict,
pmi_mode=pmi_mode,
flux_executor=None,
flux_executor_pmi_mode=None,
flux_executor_nesting=False,
flux_log_files=False,
pysqa_config_directory=pysqa_config_directory,
Expand Down Expand Up @@ -232,6 +235,7 @@ class SlurmJobExecutor(BaseExecutor):
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -278,6 +282,7 @@ def __init__(
cache_directory: Optional[str] = None,
max_cores: Optional[int] = None,
resource_dict: Optional[dict] = None,
pmi_mode: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[Callable] = None,
Expand Down Expand Up @@ -315,6 +320,7 @@ def __init__(
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -356,6 +362,7 @@ def __init__(
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
pmi_mode=pmi_mode,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
Expand All @@ -376,6 +383,7 @@ def __init__(
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
pmi_mode=pmi_mode,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
Expand All @@ -389,6 +397,7 @@ def create_slurm_executor(
max_cores: Optional[int] = None,
cache_directory: Optional[str] = None,
resource_dict: Optional[dict] = None,
pmi_mode: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[Callable] = None,
Expand Down Expand Up @@ -418,6 +427,7 @@ def create_slurm_executor(
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand All @@ -441,6 +451,7 @@ def create_slurm_executor(
resource_dict["cache_directory"] = cache_directory
resource_dict["hostname_localhost"] = hostname_localhost
resource_dict["log_obj_size"] = log_obj_size
resource_dict["pmi_mode"] = pmi_mode
check_init_function(block_allocation=block_allocation, init_function=init_function)
if block_allocation:
resource_dict["init_function"] = init_function
Expand Down
42 changes: 36 additions & 6 deletions executorlib/standalone/command.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import importlib.util
import os
import sys
from typing import Optional


def get_command_path(executable: str) -> str:
Expand All @@ -16,24 +17,53 @@ def get_command_path(executable: str) -> str:
return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable))


def get_cache_execute_command(file_name: str, cores: int = 1) -> list:
def get_cache_execute_command(
file_name: str,
cores: int = 1,
backend: Optional[str] = None,
pmi_mode: Optional[str] = None,
) -> list:
"""
Get command to call backend as a list of two strings

Args:
file_name (str): The name of the file.
cores (int, optional): Number of cores used to execute the task. Defaults to 1.
backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"].
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)

Returns:
list[str]: List of strings containing the python executable path and the backend script to execute
"""
command_lst = [sys.executable]
if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
command_lst = (
["mpiexec", "-n", str(cores)]
+ command_lst
+ [get_command_path(executable="cache_parallel.py"), file_name]
)
if backend is None:
command_lst = (
["mpiexec", "-n", str(cores)]
+ command_lst
+ [get_command_path(executable="cache_parallel.py"), file_name]
)
elif backend == "slurm":
command_prepend = ["srun", "-n", str(cores)]
if pmi_mode is not None:
command_prepend += ["--mpi=" + pmi_mode]
command_lst = (
command_prepend
+ command_lst
+ [get_command_path(executable="cache_parallel.py"), file_name]
)
elif backend == "flux":
flux_command = ["flux", "run"]
if pmi_mode is not None:
flux_command += ["-o", "pmi=" + pmi_mode]
command_lst = (
flux_command
+ ["-n", str(cores)]
+ command_lst
+ [get_command_path(executable="cache_parallel.py"), file_name]
)
else:
raise ValueError(f"backend should be None, slurm or flux, not {backend}")
elif cores > 1:
raise ImportError(
"mpi4py is required for parallel calculations. Please install mpi4py."
Expand Down
Loading
Loading