-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[minor] Move create_executor() function to separate module #550
Changes from all commits
fa36681
660b811
1e9bf50
44be424
ba992af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,260 @@ | ||
from typing import Callable, Optional, Union | ||
|
||
from executorlib.interactive.shared import ( | ||
InteractiveExecutor, | ||
InteractiveStepExecutor, | ||
) | ||
from executorlib.interactive.slurm import SrunSpawner | ||
from executorlib.interactive.slurm import ( | ||
validate_max_workers as validate_max_workers_slurm, | ||
) | ||
from executorlib.standalone.inputcheck import ( | ||
check_command_line_argument_lst, | ||
check_executor, | ||
check_flux_log_files, | ||
check_gpus_per_worker, | ||
check_init_function, | ||
check_nested_flux_executor, | ||
check_oversubscribe, | ||
check_pmi, | ||
validate_number_of_cores, | ||
) | ||
from executorlib.standalone.interactive.spawner import MpiExecSpawner | ||
|
||
try: # The PyFluxExecutor requires flux-base to be installed. | ||
from executorlib.interactive.flux import FluxPythonSpawner | ||
from executorlib.interactive.flux import ( | ||
validate_max_workers as validate_max_workers_flux, | ||
) | ||
except ImportError: | ||
pass | ||
|
||
|
||
def create_executor( | ||
max_workers: Optional[int] = None, | ||
backend: str = "local", | ||
max_cores: Optional[int] = None, | ||
cache_directory: Optional[str] = None, | ||
resource_dict: dict = {}, | ||
flux_executor=None, | ||
flux_executor_pmi_mode: Optional[str] = None, | ||
flux_executor_nesting: bool = False, | ||
flux_log_files: bool = False, | ||
hostname_localhost: Optional[bool] = None, | ||
block_allocation: bool = False, | ||
init_function: Optional[Callable] = None, | ||
) -> Union[InteractiveStepExecutor, InteractiveExecutor]: | ||
""" | ||
Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, | ||
executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The | ||
executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used | ||
for development and testing. The executorlib.flux.PyFluxExecutor requires flux-base from the flux-framework to be | ||
installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor | ||
requires the SLURM workload manager to be installed on the system. | ||
|
||
Args: | ||
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of | ||
cores which can be used in parallel - just like the max_cores parameter. Using max_cores is | ||
recommended, as computers have a limited number of compute cores. | ||
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local". | ||
max_cores (int): defines the number cores which can be used in parallel | ||
cache_directory (str, optional): The directory to store cache files. Defaults to "cache". | ||
resource_dict (dict): A dictionary of resources required by the task. With the following keys: | ||
- cores (int): number of MPI cores to be used for each function call | ||
- threads_per_core (int): number of OpenMP threads to be used for each function call | ||
- gpus_per_core (int): number of GPUs per worker - defaults to 0 | ||
- cwd (str/None): current working directory where the parallel python task is executed | ||
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and | ||
SLURM only) - default False | ||
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) | ||
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux | ||
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) | ||
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. | ||
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. | ||
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the | ||
context of an HPC cluster this essential to be able to communicate to an Executor | ||
running on a different compute node within the same allocation. And in principle | ||
any computer should be able to resolve that their own hostname points to the same | ||
address as localhost. Still MacOS >= 12 seems to disable this look up for security | ||
reasons. So on MacOS it is required to set this option to true | ||
block_allocation (boolean): To accelerate the submission of a series of python functions with the same | ||
resource requirements, executorlib supports block allocation. In this case all | ||
resources have to be defined on the executor, rather than during the submission | ||
of the individual function. | ||
init_function (None): optional function to preset arguments for functions which are submitted later | ||
""" | ||
check_init_function(block_allocation=block_allocation, init_function=init_function) | ||
if flux_executor is not None and backend != "flux_allocation": | ||
backend = "flux_allocation" | ||
check_pmi(backend=backend, pmi=flux_executor_pmi_mode) | ||
cores_per_worker = resource_dict.get("cores", 1) | ||
resource_dict["cache_directory"] = cache_directory | ||
resource_dict["hostname_localhost"] = hostname_localhost | ||
if backend == "flux_allocation": | ||
check_oversubscribe( | ||
oversubscribe=resource_dict.get("openmpi_oversubscribe", False) | ||
) | ||
check_command_line_argument_lst( | ||
command_line_argument_lst=resource_dict.get("slurm_cmd_args", []) | ||
) | ||
return create_flux_allocation_executor( | ||
max_workers=max_workers, | ||
max_cores=max_cores, | ||
cores_per_worker=cores_per_worker, | ||
resource_dict=resource_dict, | ||
flux_executor=flux_executor, | ||
flux_executor_pmi_mode=flux_executor_pmi_mode, | ||
flux_executor_nesting=flux_executor_nesting, | ||
flux_log_files=flux_log_files, | ||
block_allocation=block_allocation, | ||
init_function=init_function, | ||
) | ||
elif backend == "slurm_allocation": | ||
check_executor(executor=flux_executor) | ||
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) | ||
check_flux_log_files(flux_log_files=flux_log_files) | ||
return create_slurm_allocation_executor( | ||
max_workers=max_workers, | ||
max_cores=max_cores, | ||
cores_per_worker=cores_per_worker, | ||
resource_dict=resource_dict, | ||
block_allocation=block_allocation, | ||
init_function=init_function, | ||
) | ||
elif backend == "local": | ||
check_executor(executor=flux_executor) | ||
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) | ||
check_flux_log_files(flux_log_files=flux_log_files) | ||
check_gpus_per_worker(gpus_per_worker=resource_dict.get("gpus_per_core", 0)) | ||
check_command_line_argument_lst( | ||
command_line_argument_lst=resource_dict.get("slurm_cmd_args", []) | ||
) | ||
return create_local_executor( | ||
max_workers=max_workers, | ||
max_cores=max_cores, | ||
cores_per_worker=cores_per_worker, | ||
resource_dict=resource_dict, | ||
block_allocation=block_allocation, | ||
init_function=init_function, | ||
) | ||
else: | ||
raise ValueError( | ||
"The supported backends are slurm_allocation, slurm_submission, flux_allocation, flux_submission and local." | ||
) | ||
|
||
|
||
def create_flux_allocation_executor( | ||
max_workers: Optional[int] = None, | ||
max_cores: Optional[int] = None, | ||
cores_per_worker: int = 1, | ||
resource_dict: dict = {}, | ||
flux_executor=None, | ||
flux_executor_pmi_mode: Optional[str] = None, | ||
flux_executor_nesting: bool = False, | ||
flux_log_files: bool = False, | ||
block_allocation: bool = False, | ||
init_function: Optional[Callable] = None, | ||
) -> Union[InteractiveStepExecutor, InteractiveExecutor]: | ||
if "openmpi_oversubscribe" in resource_dict.keys(): | ||
del resource_dict["openmpi_oversubscribe"] | ||
if "slurm_cmd_args" in resource_dict.keys(): | ||
del resource_dict["slurm_cmd_args"] | ||
resource_dict["flux_executor"] = flux_executor | ||
resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode | ||
resource_dict["flux_executor_nesting"] = flux_executor_nesting | ||
resource_dict["flux_log_files"] = flux_log_files | ||
if block_allocation: | ||
resource_dict["init_function"] = init_function | ||
max_workers = validate_number_of_cores( | ||
max_cores=max_cores, | ||
max_workers=max_workers, | ||
cores_per_worker=cores_per_worker, | ||
set_local_cores=False, | ||
) | ||
validate_max_workers_flux( | ||
max_workers=max_workers, | ||
cores=cores_per_worker, | ||
threads_per_core=resource_dict.get("threads_per_core", 1), | ||
) | ||
Comment on lines
+174
to
+178
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix type incompatibility in validate_max_workers calls. The Add validation before the calls: + if cores_per_worker is None:
+ cores_per_worker = 1
validate_max_workers_flux(
max_workers=max_workers,
cores=cores_per_worker,
threads_per_core=resource_dict.get("threads_per_core", 1),
) Apply the same fix for Also applies to: 209-213 🧰 Tools🪛 GitHub Actions: MyPy[error] 176-176: Argument "cores" to "validate_max_workers" has incompatible type "int | None"; expected "int" |
||
return InteractiveExecutor( | ||
max_workers=max_workers, | ||
executor_kwargs=resource_dict, | ||
spawner=FluxPythonSpawner, | ||
) | ||
else: | ||
return InteractiveStepExecutor( | ||
max_cores=max_cores, | ||
max_workers=max_workers, | ||
executor_kwargs=resource_dict, | ||
spawner=FluxPythonSpawner, | ||
) | ||
|
||
|
||
def create_slurm_allocation_executor( | ||
max_workers: Optional[int] = None, | ||
max_cores: Optional[int] = None, | ||
cores_per_worker: int = 1, | ||
resource_dict: dict = {}, | ||
block_allocation: bool = False, | ||
init_function: Optional[Callable] = None, | ||
) -> Union[InteractiveStepExecutor, InteractiveExecutor]: | ||
if block_allocation: | ||
resource_dict["init_function"] = init_function | ||
max_workers = validate_number_of_cores( | ||
max_cores=max_cores, | ||
max_workers=max_workers, | ||
cores_per_worker=cores_per_worker, | ||
set_local_cores=False, | ||
) | ||
validate_max_workers_slurm( | ||
max_workers=max_workers, | ||
cores=cores_per_worker, | ||
threads_per_core=resource_dict.get("threads_per_core", 1), | ||
) | ||
return InteractiveExecutor( | ||
max_workers=max_workers, | ||
executor_kwargs=resource_dict, | ||
spawner=SrunSpawner, | ||
) | ||
else: | ||
return InteractiveStepExecutor( | ||
max_cores=max_cores, | ||
max_workers=max_workers, | ||
executor_kwargs=resource_dict, | ||
spawner=SrunSpawner, | ||
) | ||
|
||
|
||
def create_local_executor( | ||
max_workers: Optional[int] = None, | ||
max_cores: Optional[int] = None, | ||
cores_per_worker: int = 1, | ||
resource_dict: dict = {}, | ||
block_allocation: bool = False, | ||
init_function: Optional[Callable] = None, | ||
) -> Union[InteractiveStepExecutor, InteractiveExecutor]: | ||
if "threads_per_core" in resource_dict.keys(): | ||
del resource_dict["threads_per_core"] | ||
if "gpus_per_core" in resource_dict.keys(): | ||
del resource_dict["gpus_per_core"] | ||
if "slurm_cmd_args" in resource_dict.keys(): | ||
del resource_dict["slurm_cmd_args"] | ||
if block_allocation: | ||
resource_dict["init_function"] = init_function | ||
return InteractiveExecutor( | ||
max_workers=validate_number_of_cores( | ||
max_cores=max_cores, | ||
max_workers=max_workers, | ||
cores_per_worker=cores_per_worker, | ||
set_local_cores=True, | ||
), | ||
executor_kwargs=resource_dict, | ||
spawner=MpiExecSpawner, | ||
) | ||
else: | ||
return InteractiveStepExecutor( | ||
max_cores=max_cores, | ||
max_workers=max_workers, | ||
executor_kwargs=resource_dict, | ||
spawner=MpiExecSpawner, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update error message to match supported backends.
The error message lists backends that don't match the actual supported values in the code. The message mentions "slurm_submission" and "flux_submission" which aren't handled in the code.
Apply this diff to fix the error message:
📝 Committable suggestion