diff --git a/.github/workflows/unittest-flux-mpich.yml b/.github/workflows/unittest-flux-mpich.yml index 7d2b5b76..79ff685a 100644 --- a/.github/workflows/unittest-flux-mpich.yml +++ b/.github/workflows/unittest-flux-mpich.yml @@ -34,4 +34,4 @@ jobs: timeout-minutes: 5 run: > flux start - python -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py tests/test_cache_executor_pysqa_flux.py; + python -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py tests/test_cache_executor_pysqa_flux.py tests/test_plot_dependency_flux.py; diff --git a/.github/workflows/unittest-flux-openmpi.yml b/.github/workflows/unittest-flux-openmpi.yml index c954553e..1cb61314 100644 --- a/.github/workflows/unittest-flux-openmpi.yml +++ b/.github/workflows/unittest-flux-openmpi.yml @@ -34,7 +34,7 @@ jobs: timeout-minutes: 5 run: > flux start - coverage run -a --omit="executorlib/_version.py,tests/*" -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py tests/test_cache_executor_pysqa_flux.py; + coverage run -a --omit="executorlib/_version.py,tests/*" -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py tests/test_cache_executor_pysqa_flux.py tests/test_plot_dependency_flux.py; coverage xml env: PYMPIPOOL_PMIX: "pmix" diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 71051d25..16180430 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -1,251 +1,19 @@ -from typing import Callable, Optional - from executorlib._version import get_versions as _get_versions -from executorlib.interactive.create import create_executor as _create_executor -from executorlib.interactive.executor import ( - ExecutorWithDependencies as _ExecutorWithDependencies, -) -from executorlib.standalone.inputcheck import ( - check_plot_dependency_graph as _check_plot_dependency_graph, -) -from executorlib.standalone.inputcheck import ( - check_pysqa_config_directory as _check_pysqa_config_directory, +from executorlib.interfaces.flux import ( + FluxClusterExecutor, + FluxJobExecutor, ) -from executorlib.standalone.inputcheck import ( - check_refresh_rate as _check_refresh_rate, +from executorlib.interfaces.single import SingleNodeExecutor +from executorlib.interfaces.slurm import ( + SlurmClusterExecutor, + SlurmJobExecutor, ) __version__ = _get_versions()["version"] -__all__: list = [] - - -class Executor: - """ - The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or - preferable the flux framework for distributing python functions within a given resource allocation. In contrast to - the mpi4py.futures.MPIPoolExecutor the executorlib.Executor can be executed in a serial python process and does not - require the python script to be executed with MPI. It is even possible to execute the executorlib.Executor directly - in an interactive Jupyter notebook. - - Args: - max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of - cores which can be used in parallel - just like the max_cores parameter. Using max_cores is - recommended, as computers have a limited number of compute cores. - backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local". - cache_directory (str, optional): The directory to store cache files. Defaults to "cache". - max_cores (int): defines the number cores which can be used in parallel - resource_dict (dict): A dictionary of resources required by the task. With the following keys: - - cores (int): number of MPI cores to be used for each function call - - threads_per_core (int): number of OpenMP threads to be used for each function call - - gpus_per_core (int): number of GPUs per worker - defaults to 0 - - cwd (str/None): current working directory where the parallel python task is executed - - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and - SLURM only) - default False - - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) - flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux - flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) - flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. - flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. - pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). - hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the - context of an HPC cluster this essential to be able to communicate to an - Executor running on a different compute node within the same allocation. And - in principle any computer should be able to resolve that their own hostname - points to the same address as localhost. Still MacOS >= 12 seems to disable - this look up for security reasons. So on MacOS it is required to set this - option to true - block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource - requirements, executorlib supports block allocation. In this case all resources have - to be defined on the executor, rather than during the submission of the individual - function. - init_function (None): optional function to preset arguments for functions which are submitted later - disable_dependencies (boolean): Disable resolving future objects during the submission. - refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. - plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For - debugging purposes and to get an overview of the specified dependencies. - plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. - - Examples: - ``` - >>> import numpy as np - >>> from executorlib import Executor - >>> - >>> def calc(i, j, k): - >>> from mpi4py import MPI - >>> size = MPI.COMM_WORLD.Get_size() - >>> rank = MPI.COMM_WORLD.Get_rank() - >>> return np.array([i, j, k]), size, rank - >>> - >>> def init_k(): - >>> return {"k": 3} - >>> - >>> with Executor(cores=2, init_function=init_k) as p: - >>> fs = p.submit(calc, 2, j=4) - >>> print(fs.result()) - [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] - ``` - """ - - def __init__( - self, - max_workers: Optional[int] = None, - backend: str = "local", - cache_directory: Optional[str] = None, - max_cores: Optional[int] = None, - resource_dict: Optional[dict] = None, - flux_executor=None, - flux_executor_pmi_mode: Optional[str] = None, - flux_executor_nesting: bool = False, - flux_log_files: bool = False, - pysqa_config_directory: Optional[str] = None, - hostname_localhost: Optional[bool] = None, - block_allocation: bool = False, - init_function: Optional[Callable] = None, - disable_dependencies: bool = False, - refresh_rate: float = 0.01, - plot_dependency_graph: bool = False, - plot_dependency_graph_filename: Optional[str] = None, - ): - # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. - pass - - def __new__( - cls, - max_workers: Optional[int] = None, - backend: str = "local", - cache_directory: Optional[str] = None, - max_cores: Optional[int] = None, - resource_dict: Optional[dict] = None, - flux_executor=None, - flux_executor_pmi_mode: Optional[str] = None, - flux_executor_nesting: bool = False, - flux_log_files: bool = False, - pysqa_config_directory: Optional[str] = None, - hostname_localhost: Optional[bool] = None, - block_allocation: bool = False, - init_function: Optional[Callable] = None, - disable_dependencies: bool = False, - refresh_rate: float = 0.01, - plot_dependency_graph: bool = False, - plot_dependency_graph_filename: Optional[str] = None, - ): - """ - Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, - executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The - executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used - for development and testing. The executorlib.flux.PyFluxExecutor requires flux-core from the flux-framework to be - installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor - requires the SLURM workload manager to be installed on the system. - - Args: - max_workers (int): for backwards compatibility with the standard library, max_workers also defines the - number of cores which can be used in parallel - just like the max_cores parameter. Using - max_cores is recommended, as computers have a limited number of compute cores. - backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local". - cache_directory (str, optional): The directory to store cache files. Defaults to "cache". - max_cores (int): defines the number cores which can be used in parallel - resource_dict (dict): A dictionary of resources required by the task. With the following keys: - - cores (int): number of MPI cores to be used for each function call - - threads_per_core (int): number of OpenMP threads to be used for each function call - - gpus_per_core (int): number of GPUs per worker - defaults to 0 - - cwd (str/None): current working directory where the parallel python task is executed - - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI - and SLURM only) - default False - - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM - only) - flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux - flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) - flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. - flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. - pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). - hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the - context of an HPC cluster this essential to be able to communicate to an - Executor running on a different compute node within the same allocation. And - in principle any computer should be able to resolve that their own hostname - points to the same address as localhost. Still MacOS >= 12 seems to disable - this look up for security reasons. So on MacOS it is required to set this - option to true - block_allocation (boolean): To accelerate the submission of a series of python functions with the same - resource requirements, executorlib supports block allocation. In this case all - resources have to be defined on the executor, rather than during the submission - of the individual function. - init_function (None): optional function to preset arguments for functions which are submitted later - disable_dependencies (boolean): Disable resolving future objects during the submission. - refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. - plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For - debugging purposes and to get an overview of the specified dependencies. - plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. - - """ - default_resource_dict: dict = { - "cores": 1, - "threads_per_core": 1, - "gpus_per_core": 0, - "cwd": None, - "openmpi_oversubscribe": False, - "slurm_cmd_args": [], - } - if resource_dict is None: - resource_dict = {} - resource_dict.update( - {k: v for k, v in default_resource_dict.items() if k not in resource_dict} - ) - if "_submission" in backend and not plot_dependency_graph: - from executorlib.cache.executor import create_file_executor - - return create_file_executor( - max_workers=max_workers, - backend=backend, - max_cores=max_cores, - cache_directory=cache_directory, - resource_dict=resource_dict, - flux_executor=flux_executor, - flux_executor_pmi_mode=flux_executor_pmi_mode, - flux_executor_nesting=flux_executor_nesting, - flux_log_files=flux_log_files, - pysqa_config_directory=pysqa_config_directory, - hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, - disable_dependencies=disable_dependencies, - ) - elif not disable_dependencies: - _check_pysqa_config_directory(pysqa_config_directory=pysqa_config_directory) - return _ExecutorWithDependencies( - executor=_create_executor( - max_workers=max_workers, - backend=backend, - cache_directory=cache_directory, - max_cores=max_cores, - resource_dict=resource_dict, - flux_executor=flux_executor, - flux_executor_pmi_mode=flux_executor_pmi_mode, - flux_executor_nesting=flux_executor_nesting, - flux_log_files=flux_log_files, - hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, - ), - max_cores=max_cores, - refresh_rate=refresh_rate, - plot_dependency_graph=plot_dependency_graph, - plot_dependency_graph_filename=plot_dependency_graph_filename, - ) - else: - _check_pysqa_config_directory(pysqa_config_directory=pysqa_config_directory) - _check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) - _check_refresh_rate(refresh_rate=refresh_rate) - return _create_executor( - max_workers=max_workers, - backend=backend, - cache_directory=cache_directory, - max_cores=max_cores, - resource_dict=resource_dict, - flux_executor=flux_executor, - flux_executor_pmi_mode=flux_executor_pmi_mode, - flux_executor_nesting=flux_executor_nesting, - flux_log_files=flux_log_files, - hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, - ) +__all__: list = [ + "FluxJobExecutor", + "FluxClusterExecutor", + "SingleNodeExecutor", + "SlurmJobExecutor", + "SlurmClusterExecutor", +] diff --git a/executorlib/interactive/create.py b/executorlib/interactive/create.py deleted file mode 100644 index 61ed49e6..00000000 --- a/executorlib/interactive/create.py +++ /dev/null @@ -1,295 +0,0 @@ -from typing import Callable, Optional, Union - -from executorlib.interactive.shared import ( - InteractiveExecutor, - InteractiveStepExecutor, -) -from executorlib.interactive.slurm import SrunSpawner -from executorlib.interactive.slurm import ( - validate_max_workers as validate_max_workers_slurm, -) -from executorlib.standalone.inputcheck import ( - check_command_line_argument_lst, - check_executor, - check_flux_log_files, - check_gpus_per_worker, - check_init_function, - check_nested_flux_executor, - check_oversubscribe, - check_pmi, - validate_number_of_cores, -) -from executorlib.standalone.interactive.spawner import MpiExecSpawner - -try: # The PyFluxExecutor requires flux-base to be installed. - from executorlib.interactive.flux import FluxPythonSpawner - from executorlib.interactive.flux import ( - validate_max_workers as validate_max_workers_flux, - ) -except ImportError: - pass - - -def create_executor( - max_workers: Optional[int] = None, - backend: str = "local", - max_cores: Optional[int] = None, - cache_directory: Optional[str] = None, - resource_dict: Optional[dict] = None, - flux_executor=None, - flux_executor_pmi_mode: Optional[str] = None, - flux_executor_nesting: bool = False, - flux_log_files: bool = False, - hostname_localhost: Optional[bool] = None, - block_allocation: bool = False, - init_function: Optional[Callable] = None, -) -> Union[InteractiveStepExecutor, InteractiveExecutor]: - """ - Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, - executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The - executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used - for development and testing. The executorlib.flux.PyFluxExecutor requires flux-base from the flux-framework to be - installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor - requires the SLURM workload manager to be installed on the system. - - Args: - max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of - cores which can be used in parallel - just like the max_cores parameter. Using max_cores is - recommended, as computers have a limited number of compute cores. - backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local". - max_cores (int): defines the number cores which can be used in parallel - cache_directory (str, optional): The directory to store cache files. Defaults to "cache". - resource_dict (dict): A dictionary of resources required by the task. With the following keys: - - cores (int): number of MPI cores to be used for each function call - - threads_per_core (int): number of OpenMP threads to be used for each function call - - gpus_per_core (int): number of GPUs per worker - defaults to 0 - - cwd (str/None): current working directory where the parallel python task is executed - - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and - SLURM only) - default False - - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) - flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux - flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) - flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. - flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. - hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the - context of an HPC cluster this essential to be able to communicate to an Executor - running on a different compute node within the same allocation. And in principle - any computer should be able to resolve that their own hostname points to the same - address as localhost. Still MacOS >= 12 seems to disable this look up for security - reasons. So on MacOS it is required to set this option to true - block_allocation (boolean): To accelerate the submission of a series of python functions with the same - resource requirements, executorlib supports block allocation. In this case all - resources have to be defined on the executor, rather than during the submission - of the individual function. - init_function (None): optional function to preset arguments for functions which are submitted later - """ - if resource_dict is None: - resource_dict = {} - if flux_executor is not None and backend != "flux_allocation": - backend = "flux_allocation" - if backend == "flux_allocation": - check_init_function( - block_allocation=block_allocation, init_function=init_function - ) - check_pmi(backend=backend, pmi=flux_executor_pmi_mode) - resource_dict["cache_directory"] = cache_directory - resource_dict["hostname_localhost"] = hostname_localhost - check_oversubscribe( - oversubscribe=resource_dict.get("openmpi_oversubscribe", False) - ) - check_command_line_argument_lst( - command_line_argument_lst=resource_dict.get("slurm_cmd_args", []) - ) - return create_flux_allocation_executor( - max_workers=max_workers, - max_cores=max_cores, - cache_directory=cache_directory, - resource_dict=resource_dict, - flux_executor=flux_executor, - flux_executor_pmi_mode=flux_executor_pmi_mode, - flux_executor_nesting=flux_executor_nesting, - flux_log_files=flux_log_files, - hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, - ) - elif backend == "slurm_allocation": - check_pmi(backend=backend, pmi=flux_executor_pmi_mode) - check_executor(executor=flux_executor) - check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) - check_flux_log_files(flux_log_files=flux_log_files) - return create_slurm_allocation_executor( - max_workers=max_workers, - max_cores=max_cores, - cache_directory=cache_directory, - resource_dict=resource_dict, - hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, - ) - elif backend == "local": - check_pmi(backend=backend, pmi=flux_executor_pmi_mode) - check_executor(executor=flux_executor) - check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) - check_flux_log_files(flux_log_files=flux_log_files) - return create_local_executor( - max_workers=max_workers, - max_cores=max_cores, - cache_directory=cache_directory, - resource_dict=resource_dict, - hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, - ) - else: - raise ValueError( - "The supported backends are slurm_allocation, slurm_submission, flux_allocation, flux_submission and local." - ) - - -def create_flux_allocation_executor( - max_workers: Optional[int] = None, - max_cores: Optional[int] = None, - cache_directory: Optional[str] = None, - resource_dict: Optional[dict] = None, - flux_executor=None, - flux_executor_pmi_mode: Optional[str] = None, - flux_executor_nesting: bool = False, - flux_log_files: bool = False, - hostname_localhost: Optional[bool] = None, - block_allocation: bool = False, - init_function: Optional[Callable] = None, -) -> Union[InteractiveStepExecutor, InteractiveExecutor]: - check_init_function(block_allocation=block_allocation, init_function=init_function) - check_pmi(backend="flux_allocation", pmi=flux_executor_pmi_mode) - if resource_dict is None: - resource_dict = {} - cores_per_worker = resource_dict.get("cores", 1) - resource_dict["cache_directory"] = cache_directory - resource_dict["hostname_localhost"] = hostname_localhost - check_oversubscribe(oversubscribe=resource_dict.get("openmpi_oversubscribe", False)) - check_command_line_argument_lst( - command_line_argument_lst=resource_dict.get("slurm_cmd_args", []) - ) - if "openmpi_oversubscribe" in resource_dict: - del resource_dict["openmpi_oversubscribe"] - if "slurm_cmd_args" in resource_dict: - del resource_dict["slurm_cmd_args"] - resource_dict["flux_executor"] = flux_executor - resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode - resource_dict["flux_executor_nesting"] = flux_executor_nesting - resource_dict["flux_log_files"] = flux_log_files - if block_allocation: - resource_dict["init_function"] = init_function - max_workers = validate_number_of_cores( - max_cores=max_cores, - max_workers=max_workers, - cores_per_worker=cores_per_worker, - set_local_cores=False, - ) - validate_max_workers_flux( - max_workers=max_workers, - cores=cores_per_worker, - threads_per_core=resource_dict.get("threads_per_core", 1), - ) - return InteractiveExecutor( - max_workers=max_workers, - executor_kwargs=resource_dict, - spawner=FluxPythonSpawner, - ) - else: - return InteractiveStepExecutor( - max_cores=max_cores, - max_workers=max_workers, - executor_kwargs=resource_dict, - spawner=FluxPythonSpawner, - ) - - -def create_slurm_allocation_executor( - max_workers: Optional[int] = None, - max_cores: Optional[int] = None, - cache_directory: Optional[str] = None, - resource_dict: Optional[dict] = None, - hostname_localhost: Optional[bool] = None, - block_allocation: bool = False, - init_function: Optional[Callable] = None, -) -> Union[InteractiveStepExecutor, InteractiveExecutor]: - check_init_function(block_allocation=block_allocation, init_function=init_function) - if resource_dict is None: - resource_dict = {} - cores_per_worker = resource_dict.get("cores", 1) - resource_dict["cache_directory"] = cache_directory - resource_dict["hostname_localhost"] = hostname_localhost - if block_allocation: - resource_dict["init_function"] = init_function - max_workers = validate_number_of_cores( - max_cores=max_cores, - max_workers=max_workers, - cores_per_worker=cores_per_worker, - set_local_cores=False, - ) - validate_max_workers_slurm( - max_workers=max_workers, - cores=cores_per_worker, - threads_per_core=resource_dict.get("threads_per_core", 1), - ) - return InteractiveExecutor( - max_workers=max_workers, - executor_kwargs=resource_dict, - spawner=SrunSpawner, - ) - else: - return InteractiveStepExecutor( - max_cores=max_cores, - max_workers=max_workers, - executor_kwargs=resource_dict, - spawner=SrunSpawner, - ) - - -def create_local_executor( - max_workers: Optional[int] = None, - max_cores: Optional[int] = None, - cache_directory: Optional[str] = None, - resource_dict: Optional[dict] = None, - hostname_localhost: Optional[bool] = None, - block_allocation: bool = False, - init_function: Optional[Callable] = None, -) -> Union[InteractiveStepExecutor, InteractiveExecutor]: - check_init_function(block_allocation=block_allocation, init_function=init_function) - if resource_dict is None: - resource_dict = {} - cores_per_worker = resource_dict.get("cores", 1) - resource_dict["cache_directory"] = cache_directory - resource_dict["hostname_localhost"] = hostname_localhost - - check_gpus_per_worker(gpus_per_worker=resource_dict.get("gpus_per_core", 0)) - check_command_line_argument_lst( - command_line_argument_lst=resource_dict.get("slurm_cmd_args", []) - ) - if "threads_per_core" in resource_dict: - del resource_dict["threads_per_core"] - if "gpus_per_core" in resource_dict: - del resource_dict["gpus_per_core"] - if "slurm_cmd_args" in resource_dict: - del resource_dict["slurm_cmd_args"] - if block_allocation: - resource_dict["init_function"] = init_function - return InteractiveExecutor( - max_workers=validate_number_of_cores( - max_cores=max_cores, - max_workers=max_workers, - cores_per_worker=cores_per_worker, - set_local_cores=True, - ), - executor_kwargs=resource_dict, - spawner=MpiExecSpawner, - ) - else: - return InteractiveStepExecutor( - max_cores=max_cores, - max_workers=max_workers, - executor_kwargs=resource_dict, - spawner=MpiExecSpawner, - ) diff --git a/executorlib/interfaces/__init__.py b/executorlib/interfaces/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/executorlib/interfaces/flux.py b/executorlib/interfaces/flux.py new file mode 100644 index 00000000..30fb839d --- /dev/null +++ b/executorlib/interfaces/flux.py @@ -0,0 +1,514 @@ +from typing import Callable, Optional, Union + +from executorlib.interactive.executor import ExecutorWithDependencies +from executorlib.interactive.shared import ( + InteractiveExecutor, + InteractiveStepExecutor, +) +from executorlib.standalone.inputcheck import ( + check_command_line_argument_lst, + check_init_function, + check_oversubscribe, + check_plot_dependency_graph, + check_pmi, + check_refresh_rate, + validate_number_of_cores, +) + +try: # The PyFluxExecutor requires flux-base to be installed. + from executorlib.interactive.flux import FluxPythonSpawner + from executorlib.interactive.flux import ( + validate_max_workers as validate_max_workers_flux, + ) +except ImportError: + pass + + +class FluxJobExecutor: + """ + The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or + preferable the flux framework for distributing python functions within a given resource allocation. In contrast to + the mpi4py.futures.MPIPoolExecutor the executorlib.Executor can be executed in a serial python process and does not + require the python script to be executed with MPI. It is even possible to execute the executorlib.Executor directly + in an interactive Jupyter notebook. + + Args: + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. + cache_directory (str, optional): The directory to store cache files. Defaults to "cache". + max_cores (int): defines the number cores which can be used in parallel + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_core (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and + SLURM only) - default False + - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) + flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux + flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) + flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. + flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource + requirements, executorlib supports block allocation. In this case all resources have + to be defined on the executor, rather than during the submission of the individual + function. + init_function (None): optional function to preset arguments for functions which are submitted later + disable_dependencies (boolean): Disable resolving future objects during the submission. + refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For + debugging purposes and to get an overview of the specified dependencies. + plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + + Examples: + ``` + >>> import numpy as np + >>> from executorlib.interfaces.flux import FluxJobExecutor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> def init_k(): + >>> return {"k": 3} + >>> + >>> with FluxJobExecutor(max_workers=2, init_function=init_k) as p: + >>> fs = p.submit(calc, 2, j=4) + >>> print(fs.result()) + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + ``` + """ + + def __init__( + self, + max_workers: Optional[int] = None, + cache_directory: Optional[str] = None, + max_cores: Optional[int] = None, + resource_dict: Optional[dict] = None, + flux_executor=None, + flux_executor_pmi_mode: Optional[str] = None, + flux_executor_nesting: bool = False, + flux_log_files: bool = False, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[Callable] = None, + disable_dependencies: bool = False, + refresh_rate: float = 0.01, + plot_dependency_graph: bool = False, + plot_dependency_graph_filename: Optional[str] = None, + ): + # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. + pass + + def __new__( + cls, + max_workers: Optional[int] = None, + cache_directory: Optional[str] = None, + max_cores: Optional[int] = None, + resource_dict: Optional[dict] = None, + flux_executor=None, + flux_executor_pmi_mode: Optional[str] = None, + flux_executor_nesting: bool = False, + flux_log_files: bool = False, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[Callable] = None, + disable_dependencies: bool = False, + refresh_rate: float = 0.01, + plot_dependency_graph: bool = False, + plot_dependency_graph_filename: Optional[str] = None, + ): + """ + Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, + executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The + executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used + for development and testing. The executorlib.flux.PyFluxExecutor requires flux-core from the flux-framework to be + installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor + requires the SLURM workload manager to be installed on the system. + + Args: + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the + number of cores which can be used in parallel - just like the max_cores parameter. Using + max_cores is recommended, as computers have a limited number of compute cores. + cache_directory (str, optional): The directory to store cache files. Defaults to "cache". + max_cores (int): defines the number cores which can be used in parallel + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_core (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI + and SLURM only) - default False + - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM + only) + flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux + flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) + flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. + flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + block_allocation (boolean): To accelerate the submission of a series of python functions with the same + resource requirements, executorlib supports block allocation. In this case all + resources have to be defined on the executor, rather than during the submission + of the individual function. + init_function (None): optional function to preset arguments for functions which are submitted later + disable_dependencies (boolean): Disable resolving future objects during the submission. + refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For + debugging purposes and to get an overview of the specified dependencies. + plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + + """ + default_resource_dict: dict = { + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + } + if resource_dict is None: + resource_dict = {} + resource_dict.update( + {k: v for k, v in default_resource_dict.items() if k not in resource_dict} + ) + if not disable_dependencies: + return ExecutorWithDependencies( + executor=create_flux_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + flux_executor=flux_executor, + flux_executor_pmi_mode=flux_executor_pmi_mode, + flux_executor_nesting=flux_executor_nesting, + flux_log_files=flux_log_files, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ), + max_cores=max_cores, + refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, + plot_dependency_graph_filename=plot_dependency_graph_filename, + ) + else: + check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) + check_refresh_rate(refresh_rate=refresh_rate) + return create_flux_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + flux_executor=flux_executor, + flux_executor_pmi_mode=flux_executor_pmi_mode, + flux_executor_nesting=flux_executor_nesting, + flux_log_files=flux_log_files, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ) + + +class FluxClusterExecutor: + """ + The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or + preferable the flux framework for distributing python functions within a given resource allocation. In contrast to + the mpi4py.futures.MPIPoolExecutor the executorlib.Executor can be executed in a serial python process and does not + require the python script to be executed with MPI. It is even possible to execute the executorlib.Executor directly + in an interactive Jupyter notebook. + + Args: + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. + cache_directory (str, optional): The directory to store cache files. Defaults to "cache". + max_cores (int): defines the number cores which can be used in parallel + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_core (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and + SLURM only) - default False + - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) + pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource + requirements, executorlib supports block allocation. In this case all resources have + to be defined on the executor, rather than during the submission of the individual + function. + init_function (None): optional function to preset arguments for functions which are submitted later + disable_dependencies (boolean): Disable resolving future objects during the submission. + refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For + debugging purposes and to get an overview of the specified dependencies. + plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + + Examples: + ``` + >>> import numpy as np + >>> from executorlib.interfaces.flux import FluxClusterExecutor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> def init_k(): + >>> return {"k": 3} + >>> + >>> with FluxClusterExecutor(max_workers=2, init_function=init_k) as p: + >>> fs = p.submit(calc, 2, j=4) + >>> print(fs.result()) + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + ``` + """ + + def __init__( + self, + max_workers: Optional[int] = None, + cache_directory: Optional[str] = None, + max_cores: Optional[int] = None, + resource_dict: Optional[dict] = None, + pysqa_config_directory: Optional[str] = None, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[Callable] = None, + disable_dependencies: bool = False, + refresh_rate: float = 0.01, + plot_dependency_graph: bool = False, + plot_dependency_graph_filename: Optional[str] = None, + ): + # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. + pass + + def __new__( + cls, + max_workers: Optional[int] = None, + cache_directory: Optional[str] = None, + max_cores: Optional[int] = None, + resource_dict: Optional[dict] = None, + pysqa_config_directory: Optional[str] = None, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[Callable] = None, + disable_dependencies: bool = False, + refresh_rate: float = 0.01, + plot_dependency_graph: bool = False, + plot_dependency_graph_filename: Optional[str] = None, + ): + """ + Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, + executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The + executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used + for development and testing. The executorlib.flux.PyFluxExecutor requires flux-core from the flux-framework to be + installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor + requires the SLURM workload manager to be installed on the system. + + Args: + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the + number of cores which can be used in parallel - just like the max_cores parameter. Using + max_cores is recommended, as computers have a limited number of compute cores. + cache_directory (str, optional): The directory to store cache files. Defaults to "cache". + max_cores (int): defines the number cores which can be used in parallel + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_core (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI + and SLURM only) - default False + - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM + only) + pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + block_allocation (boolean): To accelerate the submission of a series of python functions with the same + resource requirements, executorlib supports block allocation. In this case all + resources have to be defined on the executor, rather than during the submission + of the individual function. + init_function (None): optional function to preset arguments for functions which are submitted later + disable_dependencies (boolean): Disable resolving future objects during the submission. + refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For + debugging purposes and to get an overview of the specified dependencies. + plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + + """ + default_resource_dict: dict = { + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + } + if resource_dict is None: + resource_dict = {} + resource_dict.update( + {k: v for k, v in default_resource_dict.items() if k not in resource_dict} + ) + if not plot_dependency_graph: + from executorlib.cache.executor import create_file_executor + + return create_file_executor( + max_workers=max_workers, + backend="flux_submission", + max_cores=max_cores, + cache_directory=cache_directory, + resource_dict=resource_dict, + flux_executor=None, + flux_executor_pmi_mode=None, + flux_executor_nesting=False, + flux_log_files=False, + pysqa_config_directory=pysqa_config_directory, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + disable_dependencies=disable_dependencies, + ) + else: + return ExecutorWithDependencies( + executor=create_flux_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + flux_executor=None, + flux_executor_pmi_mode=None, + flux_executor_nesting=False, + flux_log_files=False, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ), + max_cores=max_cores, + refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, + plot_dependency_graph_filename=plot_dependency_graph_filename, + ) + + +def create_flux_executor( + max_workers: Optional[int] = None, + max_cores: Optional[int] = None, + cache_directory: Optional[str] = None, + resource_dict: Optional[dict] = None, + flux_executor=None, + flux_executor_pmi_mode: Optional[str] = None, + flux_executor_nesting: bool = False, + flux_log_files: bool = False, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[Callable] = None, +) -> Union[InteractiveStepExecutor, InteractiveExecutor]: + """ + Create a flux executor + + Args: + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the + number of cores which can be used in parallel - just like the max_cores parameter. Using + max_cores is recommended, as computers have a limited number of compute cores. + max_cores (int): defines the number cores which can be used in parallel + cache_directory (str, optional): The directory to store cache files. Defaults to "cache". + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_core (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI + and SLURM only) - default False + - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM + only) + flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux + flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) + flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. + flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + block_allocation (boolean): To accelerate the submission of a series of python functions with the same + resource requirements, executorlib supports block allocation. In this case all + resources have to be defined on the executor, rather than during the submission + of the individual function. + init_function (None): optional function to preset arguments for functions which are submitted later + + Returns: + InteractiveStepExecutor/ InteractiveExecutor + """ + if resource_dict is None: + resource_dict = {} + cores_per_worker = resource_dict.get("cores", 1) + resource_dict["cache_directory"] = cache_directory + resource_dict["hostname_localhost"] = hostname_localhost + check_init_function(block_allocation=block_allocation, init_function=init_function) + check_pmi(backend="flux_allocation", pmi=flux_executor_pmi_mode) + check_oversubscribe(oversubscribe=resource_dict.get("openmpi_oversubscribe", False)) + check_command_line_argument_lst( + command_line_argument_lst=resource_dict.get("slurm_cmd_args", []) + ) + if "openmpi_oversubscribe" in resource_dict: + del resource_dict["openmpi_oversubscribe"] + if "slurm_cmd_args" in resource_dict: + del resource_dict["slurm_cmd_args"] + resource_dict["flux_executor"] = flux_executor + resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode + resource_dict["flux_executor_nesting"] = flux_executor_nesting + resource_dict["flux_log_files"] = flux_log_files + if block_allocation: + resource_dict["init_function"] = init_function + max_workers = validate_number_of_cores( + max_cores=max_cores, + max_workers=max_workers, + cores_per_worker=cores_per_worker, + set_local_cores=False, + ) + validate_max_workers_flux( + max_workers=max_workers, + cores=cores_per_worker, + threads_per_core=resource_dict.get("threads_per_core", 1), + ) + return InteractiveExecutor( + max_workers=max_workers, + executor_kwargs=resource_dict, + spawner=FluxPythonSpawner, + ) + else: + return InteractiveStepExecutor( + max_cores=max_cores, + max_workers=max_workers, + executor_kwargs=resource_dict, + spawner=FluxPythonSpawner, + ) diff --git a/executorlib/interfaces/single.py b/executorlib/interfaces/single.py new file mode 100644 index 00000000..ec594c44 --- /dev/null +++ b/executorlib/interfaces/single.py @@ -0,0 +1,274 @@ +from typing import Callable, Optional, Union + +from executorlib.interactive.executor import ExecutorWithDependencies +from executorlib.interactive.shared import ( + InteractiveExecutor, + InteractiveStepExecutor, +) +from executorlib.standalone.inputcheck import ( + check_command_line_argument_lst, + check_gpus_per_worker, + check_init_function, + check_plot_dependency_graph, + check_refresh_rate, + validate_number_of_cores, +) +from executorlib.standalone.interactive.spawner import MpiExecSpawner + + +class SingleNodeExecutor: + """ + The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or + preferable the flux framework for distributing python functions within a given resource allocation. In contrast to + the mpi4py.futures.MPIPoolExecutor the executorlib.Executor can be executed in a serial python process and does not + require the python script to be executed with MPI. It is even possible to execute the executorlib.Executor directly + in an interactive Jupyter notebook. + + Args: + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. + cache_directory (str, optional): The directory to store cache files. Defaults to "cache". + max_cores (int): defines the number cores which can be used in parallel + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_core (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and + SLURM only) - default False + - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource + requirements, executorlib supports block allocation. In this case all resources have + to be defined on the executor, rather than during the submission of the individual + function. + init_function (None): optional function to preset arguments for functions which are submitted later + disable_dependencies (boolean): Disable resolving future objects during the submission. + refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For + debugging purposes and to get an overview of the specified dependencies. + plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + + Examples: + ``` + >>> import numpy as np + >>> from executorlib.interfaces.single import SingleNodeExecutor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> def init_k(): + >>> return {"k": 3} + >>> + >>> with SingleNodeExecutor(max_workers=2, init_function=init_k) as p: + >>> fs = p.submit(calc, 2, j=4) + >>> print(fs.result()) + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + ``` + """ + + def __init__( + self, + max_workers: Optional[int] = None, + cache_directory: Optional[str] = None, + max_cores: Optional[int] = None, + resource_dict: Optional[dict] = None, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[Callable] = None, + disable_dependencies: bool = False, + refresh_rate: float = 0.01, + plot_dependency_graph: bool = False, + plot_dependency_graph_filename: Optional[str] = None, + ): + # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. + pass + + def __new__( + cls, + max_workers: Optional[int] = None, + cache_directory: Optional[str] = None, + max_cores: Optional[int] = None, + resource_dict: Optional[dict] = None, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[Callable] = None, + disable_dependencies: bool = False, + refresh_rate: float = 0.01, + plot_dependency_graph: bool = False, + plot_dependency_graph_filename: Optional[str] = None, + ): + """ + Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, + executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The + executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used + for development and testing. The executorlib.flux.PyFluxExecutor requires flux-core from the flux-framework to be + installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor + requires the SLURM workload manager to be installed on the system. + + Args: + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the + number of cores which can be used in parallel - just like the max_cores parameter. Using + max_cores is recommended, as computers have a limited number of compute cores. + cache_directory (str, optional): The directory to store cache files. Defaults to "cache". + max_cores (int): defines the number cores which can be used in parallel + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_core (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI + and SLURM only) - default False + - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM + only) + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + block_allocation (boolean): To accelerate the submission of a series of python functions with the same + resource requirements, executorlib supports block allocation. In this case all + resources have to be defined on the executor, rather than during the submission + of the individual function. + init_function (None): optional function to preset arguments for functions which are submitted later + disable_dependencies (boolean): Disable resolving future objects during the submission. + refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For + debugging purposes and to get an overview of the specified dependencies. + plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + + """ + default_resource_dict: dict = { + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + } + if resource_dict is None: + resource_dict = {} + resource_dict.update( + {k: v for k, v in default_resource_dict.items() if k not in resource_dict} + ) + if not disable_dependencies: + return ExecutorWithDependencies( + executor=create_single_node_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ), + max_cores=max_cores, + refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, + plot_dependency_graph_filename=plot_dependency_graph_filename, + ) + else: + check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) + check_refresh_rate(refresh_rate=refresh_rate) + return create_single_node_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ) + + +def create_single_node_executor( + max_workers: Optional[int] = None, + max_cores: Optional[int] = None, + cache_directory: Optional[str] = None, + resource_dict: Optional[dict] = None, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[Callable] = None, +) -> Union[InteractiveStepExecutor, InteractiveExecutor]: + """ + Create a single node executor + + Args: + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the + number of cores which can be used in parallel - just like the max_cores parameter. Using + max_cores is recommended, as computers have a limited number of compute cores. + max_cores (int): defines the number cores which can be used in parallel + cache_directory (str, optional): The directory to store cache files. Defaults to "cache". + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_core (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI + and SLURM only) - default False + - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM + only) + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + block_allocation (boolean): To accelerate the submission of a series of python functions with the same + resource requirements, executorlib supports block allocation. In this case all + resources have to be defined on the executor, rather than during the submission + of the individual function. + init_function (None): optional function to preset arguments for functions which are submitted later + + Returns: + InteractiveStepExecutor/ InteractiveExecutor + """ + if resource_dict is None: + resource_dict = {} + cores_per_worker = resource_dict.get("cores", 1) + resource_dict["cache_directory"] = cache_directory + resource_dict["hostname_localhost"] = hostname_localhost + + check_init_function(block_allocation=block_allocation, init_function=init_function) + check_gpus_per_worker(gpus_per_worker=resource_dict.get("gpus_per_core", 0)) + check_command_line_argument_lst( + command_line_argument_lst=resource_dict.get("slurm_cmd_args", []) + ) + if "threads_per_core" in resource_dict: + del resource_dict["threads_per_core"] + if "gpus_per_core" in resource_dict: + del resource_dict["gpus_per_core"] + if "slurm_cmd_args" in resource_dict: + del resource_dict["slurm_cmd_args"] + if block_allocation: + resource_dict["init_function"] = init_function + return InteractiveExecutor( + max_workers=validate_number_of_cores( + max_cores=max_cores, + max_workers=max_workers, + cores_per_worker=cores_per_worker, + set_local_cores=True, + ), + executor_kwargs=resource_dict, + spawner=MpiExecSpawner, + ) + else: + return InteractiveStepExecutor( + max_cores=max_cores, + max_workers=max_workers, + executor_kwargs=resource_dict, + spawner=MpiExecSpawner, + ) diff --git a/executorlib/interfaces/slurm.py b/executorlib/interfaces/slurm.py new file mode 100644 index 00000000..308c1f31 --- /dev/null +++ b/executorlib/interfaces/slurm.py @@ -0,0 +1,458 @@ +from typing import Callable, Optional, Union + +from executorlib.interactive.executor import ExecutorWithDependencies +from executorlib.interactive.shared import ( + InteractiveExecutor, + InteractiveStepExecutor, +) +from executorlib.interactive.slurm import SrunSpawner +from executorlib.interactive.slurm import ( + validate_max_workers as validate_max_workers_slurm, +) +from executorlib.standalone.inputcheck import ( + check_init_function, + check_plot_dependency_graph, + check_refresh_rate, + validate_number_of_cores, +) + + +class SlurmClusterExecutor: + """ + The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or + preferable the flux framework for distributing python functions within a given resource allocation. In contrast to + the mpi4py.futures.MPIPoolExecutor the executorlib.Executor can be executed in a serial python process and does not + require the python script to be executed with MPI. It is even possible to execute the executorlib.Executor directly + in an interactive Jupyter notebook. + + Args: + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. + cache_directory (str, optional): The directory to store cache files. Defaults to "cache". + max_cores (int): defines the number cores which can be used in parallel + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_core (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and + SLURM only) - default False + - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) + pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource + requirements, executorlib supports block allocation. In this case all resources have + to be defined on the executor, rather than during the submission of the individual + function. + init_function (None): optional function to preset arguments for functions which are submitted later + disable_dependencies (boolean): Disable resolving future objects during the submission. + refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For + debugging purposes and to get an overview of the specified dependencies. + plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + + Examples: + ``` + >>> import numpy as np + >>> from executorlib.interfaces.slurm import SlurmClusterExecutor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> def init_k(): + >>> return {"k": 3} + >>> + >>> with SlurmClusterExecutor(max_workers=2, init_function=init_k) as p: + >>> fs = p.submit(calc, 2, j=4) + >>> print(fs.result()) + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + ``` + """ + + def __init__( + self, + max_workers: Optional[int] = None, + cache_directory: Optional[str] = None, + max_cores: Optional[int] = None, + resource_dict: Optional[dict] = None, + pysqa_config_directory: Optional[str] = None, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[Callable] = None, + disable_dependencies: bool = False, + refresh_rate: float = 0.01, + plot_dependency_graph: bool = False, + plot_dependency_graph_filename: Optional[str] = None, + ): + # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. + pass + + def __new__( + cls, + max_workers: Optional[int] = None, + cache_directory: Optional[str] = None, + max_cores: Optional[int] = None, + resource_dict: Optional[dict] = None, + pysqa_config_directory: Optional[str] = None, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[Callable] = None, + disable_dependencies: bool = False, + refresh_rate: float = 0.01, + plot_dependency_graph: bool = False, + plot_dependency_graph_filename: Optional[str] = None, + ): + """ + Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, + executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The + executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used + for development and testing. The executorlib.flux.PyFluxExecutor requires flux-core from the flux-framework to be + installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor + requires the SLURM workload manager to be installed on the system. + + Args: + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the + number of cores which can be used in parallel - just like the max_cores parameter. Using + max_cores is recommended, as computers have a limited number of compute cores. + cache_directory (str, optional): The directory to store cache files. Defaults to "cache". + max_cores (int): defines the number cores which can be used in parallel + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_core (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI + and SLURM only) - default False + - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM + only) + pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + block_allocation (boolean): To accelerate the submission of a series of python functions with the same + resource requirements, executorlib supports block allocation. In this case all + resources have to be defined on the executor, rather than during the submission + of the individual function. + init_function (None): optional function to preset arguments for functions which are submitted later + disable_dependencies (boolean): Disable resolving future objects during the submission. + refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For + debugging purposes and to get an overview of the specified dependencies. + plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + + """ + default_resource_dict: dict = { + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + } + if resource_dict is None: + resource_dict = {} + resource_dict.update( + {k: v for k, v in default_resource_dict.items() if k not in resource_dict} + ) + if not plot_dependency_graph: + from executorlib.cache.executor import create_file_executor + + return create_file_executor( + max_workers=max_workers, + backend="slurm_submission", + max_cores=max_cores, + cache_directory=cache_directory, + resource_dict=resource_dict, + flux_executor=None, + flux_executor_pmi_mode=None, + flux_executor_nesting=False, + flux_log_files=False, + pysqa_config_directory=pysqa_config_directory, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + disable_dependencies=disable_dependencies, + ) + else: + return ExecutorWithDependencies( + executor=create_slurm_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ), + max_cores=max_cores, + refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, + plot_dependency_graph_filename=plot_dependency_graph_filename, + ) + + +class SlurmJobExecutor: + """ + The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or + preferable the flux framework for distributing python functions within a given resource allocation. In contrast to + the mpi4py.futures.MPIPoolExecutor the executorlib.Executor can be executed in a serial python process and does not + require the python script to be executed with MPI. It is even possible to execute the executorlib.Executor directly + in an interactive Jupyter notebook. + + Args: + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. + cache_directory (str, optional): The directory to store cache files. Defaults to "cache". + max_cores (int): defines the number cores which can be used in parallel + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_core (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and + SLURM only) - default False + - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource + requirements, executorlib supports block allocation. In this case all resources have + to be defined on the executor, rather than during the submission of the individual + function. + init_function (None): optional function to preset arguments for functions which are submitted later + disable_dependencies (boolean): Disable resolving future objects during the submission. + refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For + debugging purposes and to get an overview of the specified dependencies. + plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + + Examples: + ``` + >>> import numpy as np + >>> from executorlib.interfaces.slurm import SlurmJobExecutor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> def init_k(): + >>> return {"k": 3} + >>> + >>> with SlurmJobExecutor(max_workers=2, init_function=init_k) as p: + >>> fs = p.submit(calc, 2, j=4) + >>> print(fs.result()) + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + ``` + """ + + def __init__( + self, + max_workers: Optional[int] = None, + cache_directory: Optional[str] = None, + max_cores: Optional[int] = None, + resource_dict: Optional[dict] = None, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[Callable] = None, + disable_dependencies: bool = False, + refresh_rate: float = 0.01, + plot_dependency_graph: bool = False, + plot_dependency_graph_filename: Optional[str] = None, + ): + # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. + pass + + def __new__( + cls, + max_workers: Optional[int] = None, + cache_directory: Optional[str] = None, + max_cores: Optional[int] = None, + resource_dict: Optional[dict] = None, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[Callable] = None, + disable_dependencies: bool = False, + refresh_rate: float = 0.01, + plot_dependency_graph: bool = False, + plot_dependency_graph_filename: Optional[str] = None, + ): + """ + Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, + executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The + executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used + for development and testing. The executorlib.flux.PyFluxExecutor requires flux-core from the flux-framework to be + installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor + requires the SLURM workload manager to be installed on the system. + + Args: + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the + number of cores which can be used in parallel - just like the max_cores parameter. Using + max_cores is recommended, as computers have a limited number of compute cores. + cache_directory (str, optional): The directory to store cache files. Defaults to "cache". + max_cores (int): defines the number cores which can be used in parallel + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_core (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI + and SLURM only) - default False + - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM + only) + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + block_allocation (boolean): To accelerate the submission of a series of python functions with the same + resource requirements, executorlib supports block allocation. In this case all + resources have to be defined on the executor, rather than during the submission + of the individual function. + init_function (None): optional function to preset arguments for functions which are submitted later + disable_dependencies (boolean): Disable resolving future objects during the submission. + refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For + debugging purposes and to get an overview of the specified dependencies. + plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + + """ + default_resource_dict: dict = { + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + } + if resource_dict is None: + resource_dict = {} + resource_dict.update( + {k: v for k, v in default_resource_dict.items() if k not in resource_dict} + ) + if not disable_dependencies: + return ExecutorWithDependencies( + executor=create_slurm_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ), + max_cores=max_cores, + refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, + plot_dependency_graph_filename=plot_dependency_graph_filename, + ) + else: + check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) + check_refresh_rate(refresh_rate=refresh_rate) + return create_slurm_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ) + + +def create_slurm_executor( + max_workers: Optional[int] = None, + max_cores: Optional[int] = None, + cache_directory: Optional[str] = None, + resource_dict: Optional[dict] = None, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[Callable] = None, +) -> Union[InteractiveStepExecutor, InteractiveExecutor]: + """ + Create a SLURM executor + + Args: + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the + number of cores which can be used in parallel - just like the max_cores parameter. Using + max_cores is recommended, as computers have a limited number of compute cores. + max_cores (int): defines the number cores which can be used in parallel + cache_directory (str, optional): The directory to store cache files. Defaults to "cache". + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_core (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI + and SLURM only) - default False + - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM + only) + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + block_allocation (boolean): To accelerate the submission of a series of python functions with the same + resource requirements, executorlib supports block allocation. In this case all + resources have to be defined on the executor, rather than during the submission + of the individual function. + init_function (None): optional function to preset arguments for functions which are submitted later + + Returns: + InteractiveStepExecutor/ InteractiveExecutor + """ + if resource_dict is None: + resource_dict = {} + cores_per_worker = resource_dict.get("cores", 1) + resource_dict["cache_directory"] = cache_directory + resource_dict["hostname_localhost"] = hostname_localhost + check_init_function(block_allocation=block_allocation, init_function=init_function) + if block_allocation: + resource_dict["init_function"] = init_function + max_workers = validate_number_of_cores( + max_cores=max_cores, + max_workers=max_workers, + cores_per_worker=cores_per_worker, + set_local_cores=False, + ) + validate_max_workers_slurm( + max_workers=max_workers, + cores=cores_per_worker, + threads_per_core=resource_dict.get("threads_per_core", 1), + ) + return InteractiveExecutor( + max_workers=max_workers, + executor_kwargs=resource_dict, + spawner=SrunSpawner, + ) + else: + return InteractiveStepExecutor( + max_cores=max_cores, + max_workers=max_workers, + executor_kwargs=resource_dict, + spawner=SrunSpawner, + ) diff --git a/notebooks/1-local.ipynb b/notebooks/1-local.ipynb index 861491f3..d8a70d91 100644 --- a/notebooks/1-local.ipynb +++ b/notebooks/1-local.ipynb @@ -26,9 +26,7 @@ "id": "b1907f12-7378-423b-9b83-1b65fc0a20f5", "metadata": {}, "outputs": [], - "source": [ - "from executorlib import Executor" - ] + "source": "from executorlib import SingleNodeExecutor" }, { "cell_type": "markdown", @@ -56,7 +54,7 @@ ], "source": [ "%%time\n", - "with Executor(backend=\"local\") as exe:\n", + "with SingleNodeExecutor() as exe:\n", " future = exe.submit(sum, [1, 1])\n", " print(future.result())" ] @@ -87,7 +85,7 @@ ], "source": [ "%%time\n", - "with Executor(backend=\"local\") as exe:\n", + "with SingleNodeExecutor() as exe:\n", " future_lst = [exe.submit(sum, [i, i]) for i in range(2, 5)]\n", " print([f.result() for f in future_lst])" ] @@ -118,7 +116,7 @@ ], "source": [ "%%time\n", - "with Executor(backend=\"local\") as exe:\n", + "with SingleNodeExecutor() as exe:\n", " results = exe.map(sum, [[5, 5], [6, 6], [7, 7]])\n", " print(list(results))" ] @@ -191,7 +189,7 @@ } ], "source": [ - "with Executor(backend=\"local\") as exe:\n", + "with SingleNodeExecutor() as exe:\n", " fs = exe.submit(calc_mpi, 3, resource_dict={\"cores\": 2})\n", " print(fs.result())" ] @@ -219,7 +217,7 @@ } ], "source": [ - "with Executor(backend=\"local\", resource_dict={\"cores\": 2}) as exe:\n", + "with SingleNodeExecutor(resource_dict={\"cores\": 2}) as exe:\n", " fs = exe.submit(calc_mpi, 3)\n", " print(fs.result())" ] @@ -285,7 +283,7 @@ } ], "source": [ - "with Executor(backend=\"local\") as exe:\n", + "with SingleNodeExecutor() as exe:\n", " fs = exe.submit(calc_with_threads, 3, resource_dict={\"threads_per_core\": 2})\n", " print(fs.result())" ] @@ -313,7 +311,7 @@ } ], "source": [ - "with Executor(backend=\"local\", resource_dict={\"threads_per_core\": 2}) as exe:\n", + "with SingleNodeExecutor(resource_dict={\"threads_per_core\": 2}) as exe:\n", " fs = exe.submit(calc_with_threads, 3)\n", " print(fs.result())" ] @@ -364,7 +362,7 @@ ], "source": [ "%%time\n", - "with Executor(max_workers=2, backend=\"local\", block_allocation=True) as exe:\n", + "with SingleNodeExecutor(max_workers=2, block_allocation=True) as exe:\n", " future = exe.submit(sum, [1, 1])\n", " print(future.result())" ] @@ -457,9 +455,7 @@ } ], "source": [ - "with Executor(\n", - " backend=\"local\", resource_dict={\"cores\": 2}, block_allocation=True\n", - ") as exe:\n", + "with SingleNodeExecutor(resource_dict={\"cores\": 2}, block_allocation=True) as exe:\n", " fs = exe.submit(calc_mpi, 3)\n", " print(fs.result())" ] @@ -517,9 +513,7 @@ } ], "source": [ - "with Executor(\n", - " backend=\"local\", init_function=init_function, block_allocation=True\n", - ") as exe:\n", + "with SingleNodeExecutor(init_function=init_function, block_allocation=True) as exe:\n", " fs = exe.submit(calc_with_preload, 2, j=5)\n", " print(fs.result())" ] @@ -561,7 +555,7 @@ ], "source": [ "%%time\n", - "with Executor(backend=\"local\", cache_directory=\"./cache\") as exe:\n", + "with SingleNodeExecutor(cache_directory=\"./cache\") as exe:\n", " future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n", " print([f.result() for f in future_lst])" ] @@ -594,7 +588,7 @@ ], "source": [ "%%time\n", - "with Executor(backend=\"local\", cache_directory=\"./cache\") as exe:\n", + "with SingleNodeExecutor(cache_directory=\"./cache\") as exe:\n", " future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n", " print([f.result() for f in future_lst])" ] @@ -687,7 +681,7 @@ } ], "source": [ - "with Executor(backend=\"local\") as exe:\n", + "with SingleNodeExecutor() as exe:\n", " future = 0\n", " for i in range(1, 4):\n", " future = exe.submit(calc_add, i, future)\n", @@ -815,7 +809,7 @@ } ], "source": [ - "with Executor(backend=\"local\", plot_dependency_graph=True) as exe:\n", + "with SingleNodeExecutor(plot_dependency_graph=True) as exe:\n", " future = 0\n", " for i in range(1, 4):\n", " future = exe.submit(calc_add, i, future)\n", diff --git a/notebooks/2-hpc-submission.ipynb b/notebooks/2-hpc-submission.ipynb index f0998333..e62b4f0a 100644 --- a/notebooks/2-hpc-submission.ipynb +++ b/notebooks/2-hpc-submission.ipynb @@ -28,7 +28,7 @@ "metadata": {}, "source": [ "```python\n", - "from executorlib import Executor\n", + "from executorlib import SlurmSubmissionExecutor\n", "```" ] }, @@ -46,7 +46,7 @@ "metadata": {}, "source": [ "```python\n", - "with Executor(backend=\"slurm_submission\", cache_directory=\"./cache\") as exe:\n", + "with SlurmSubmissionExecutor(cache_directory=\"./cache\") as exe:\n", " future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n", " print([f.result() for f in future_lst])\n", "```" @@ -87,7 +87,7 @@ "{{command}}\n", "\"\"\"\n", "\n", - "with Executor(backend=\"slurm_submission\", cache_directory=\"./cache\") as exe:\n", + "with SlurmSubmissionExecutor(cache_directory=\"./cache\") as exe:\n", " future = exe.submit(\n", " sum, [4, 4], \n", " resource_dict={\n", @@ -141,7 +141,9 @@ "metadata": {}, "source": [ "```python\n", - "with Executor(backend=\"flux_submission\", cache_directory=\"./cache\") as exe:\n", + "from executorlib import FluxSubmissionExecutor\n", + "\n", + "with FluxSubmissionExecutor(cache_directory=\"./cache\") as exe:\n", " future = 0\n", " for i in range(4, 8):\n", " future = exe.submit(add_funct, i, future)\n", @@ -181,7 +183,7 @@ "metadata": {}, "source": [ "```python\n", - "with Executor(backend=\"flux_submission\", cache_directory=\"./cache\") as exe:\n", + "with FluxSubmissionExecutor(cache_directory=\"./cache\") as exe:\n", " fs = exe.submit(calc, 3, resource_dict={\"cores\": 2})\n", " print(fs.result())\n", "```" @@ -205,8 +207,7 @@ "```\n", "\n", "```python\n", - "with Executor(\n", - " backend=\"flux_submission\",\n", + "with FluxSubmissionExecutor(\n", " cache_directory=\"./cache\",\n", " resource_dict={\"gpus_per_core\": 1}\n", ") as exe:\n", diff --git a/notebooks/3-hpc-allocation.ipynb b/notebooks/3-hpc-allocation.ipynb index 1d4ef3cf..fc0f84c6 100644 --- a/notebooks/3-hpc-allocation.ipynb +++ b/notebooks/3-hpc-allocation.ipynb @@ -32,7 +32,7 @@ "metadata": {}, "outputs": [], "source": [ - "from executorlib import Executor" + "from executorlib import SlurmJobExecutor" ] }, { @@ -41,7 +41,7 @@ "metadata": {}, "source": [ "```python\n", - "with Executor(backend=\"slurm_allocation\") as exe:\n", + "with SlurmAllocationExecutor() as exe:\n", " future = exe.submit(sum, [1, 1])\n", " print(future.result())\n", "```" @@ -111,7 +111,9 @@ } ], "source": [ - "with Executor(backend=\"flux_allocation\", flux_executor_pmi_mode=\"pmix\") as exe:\n", + "from executorlib import FluxJobExecutor\n", + "\n", + "with FluxJobExecutor(flux_executor_pmi_mode=\"pmix\") as exe:\n", " fs = exe.submit(calc_mpi, 3, resource_dict={\"cores\": 2})\n", " print(fs.result())" ] @@ -162,8 +164,7 @@ } ], "source": [ - "with Executor(\n", - " backend=\"flux_allocation\",\n", + "with FluxJobExecutor(\n", " flux_executor_pmi_mode=\"pmix\",\n", " max_workers=2,\n", " init_function=init_function,\n", @@ -218,7 +219,7 @@ } ], "source": [ - "with Executor(backend=\"flux_allocation\", flux_executor_pmi_mode=\"pmix\") as exe:\n", + "with FluxJobExecutor(flux_executor_pmi_mode=\"pmix\") as exe:\n", " future = 0\n", " for i in range(1, 4):\n", " future = exe.submit(add_funct, i, future)\n", @@ -249,9 +250,7 @@ } ], "source": [ - "with Executor(\n", - " backend=\"flux_allocation\", flux_executor_pmi_mode=\"pmix\", cache_directory=\"./cache\"\n", - ") as exe:\n", + "with FluxJobExecutor(flux_executor_pmi_mode=\"pmix\", cache_directory=\"./cache\") as exe:\n", " future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n", " print([f.result() for f in future_lst])" ] @@ -300,9 +299,9 @@ "outputs": [], "source": [ "def calc_nested():\n", - " from executorlib import Executor\n", + " from executorlib import FluxJobExecutor\n", "\n", - " with Executor(backend=\"flux_allocation\", flux_executor_pmi_mode=\"pmix\") as exe:\n", + " with FluxJobExecutor(flux_executor_pmi_mode=\"pmix\") as exe:\n", " fs = exe.submit(sum, [1, 1])\n", " return fs.result()" ] @@ -322,9 +321,7 @@ } ], "source": [ - "with Executor(\n", - " backend=\"flux_allocation\", flux_executor_pmi_mode=\"pmix\", flux_executor_nesting=True\n", - ") as exe:\n", + "with FluxJobExecutor(flux_executor_pmi_mode=\"pmix\", flux_executor_nesting=True) as exe:\n", " fs = exe.submit(calc_nested)\n", " print(fs.result())" ] diff --git a/notebooks/4-developer.ipynb b/notebooks/4-developer.ipynb index 5bda8e1a..e76c3f88 100644 --- a/notebooks/4-developer.ipynb +++ b/notebooks/4-developer.ipynb @@ -56,9 +56,7 @@ "id": "83515b16-c4d5-4b02-acd7-9e1eb57fd335", "metadata": {}, "outputs": [], - "source": [ - "from executorlib import Executor" - ] + "source": "from executorlib import SingleNodeExecutor" }, { "cell_type": "code", @@ -93,7 +91,7 @@ } ], "source": [ - "with Executor(backend=\"local\") as exe:\n", + "with SingleNodeExecutor() as exe:\n", " future = exe.submit(\n", " execute_shell_command,\n", " [\"echo\", \"test\"],\n", @@ -250,7 +248,7 @@ } ], "source": [ - "with Executor(\n", + "with SingleNodeExecutor(\n", " max_workers=1,\n", " init_function=init_process,\n", " block_allocation=True,\n", diff --git a/tests/benchmark/llh.py b/tests/benchmark/llh.py index 3fae9ad2..d1488d21 100644 --- a/tests/benchmark/llh.py +++ b/tests/benchmark/llh.py @@ -42,39 +42,36 @@ def run_static(mean=0.1, sigma=1.1, runs=32): executor=ThreadPoolExecutor, mean=0.1, sigma=1.1, runs=32, max_workers=4 ) elif run_mode == "block_allocation": - from executorlib import Executor + from executorlib import SingleNodeExecutor run_with_executor( - executor=Executor, + executor=SingleNodeExecutor, mean=0.1, sigma=1.1, runs=32, max_cores=4, - backend="local", block_allocation=True, ) elif run_mode == "executorlib": - from executorlib import Executor + from executorlib import SingleNodeExecutor run_with_executor( - executor=Executor, + executor=SingleNodeExecutor, mean=0.1, sigma=1.1, runs=32, max_cores=4, - backend="local", block_allocation=False, ) elif run_mode == "flux": - from executorlib import Executor + from executorlib import FluxJobExecutor run_with_executor( - executor=Executor, + executor=FluxJobExecutor, mean=0.1, sigma=1.1, runs=32, max_cores=4, - backend="flux", block_allocation=True, ) elif run_mode == "mpi4py": diff --git a/tests/test_cache_executor_interactive.py b/tests/test_cache_executor_interactive.py index 35f08a5c..cc5752da 100644 --- a/tests/test_cache_executor_interactive.py +++ b/tests/test_cache_executor_interactive.py @@ -2,7 +2,7 @@ import shutil import unittest -from executorlib import Executor +from executorlib import SingleNodeExecutor try: from executorlib.standalone.hdf import get_cache_data @@ -18,7 +18,7 @@ class TestCacheFunctions(unittest.TestCase): def test_cache_data(self): cache_directory = "./cache" - with Executor(backend="local", cache_directory=cache_directory) as exe: + with SingleNodeExecutor(cache_directory=cache_directory) as exe: future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)] result_lst = [f.result() for f in future_lst] diff --git a/tests/test_cache_executor_pysqa_flux.py b/tests/test_cache_executor_pysqa_flux.py index 827fd455..4a35c488 100644 --- a/tests/test_cache_executor_pysqa_flux.py +++ b/tests/test_cache_executor_pysqa_flux.py @@ -3,7 +3,7 @@ import unittest import shutil -from executorlib import Executor +from executorlib import FluxClusterExecutor from executorlib.standalone.serialize import cloudpickle_register try: @@ -32,8 +32,7 @@ def mpi_funct(i): ) class TestCacheExecutorPysqa(unittest.TestCase): def test_executor(self): - with Executor( - backend="flux_submission", + with FluxClusterExecutor( resource_dict={"cores": 2, "cwd": "cache"}, block_allocation=False, cache_directory="cache", diff --git a/tests/test_dependencies_executor.py b/tests/test_dependencies_executor.py index 32c9400f..aaf21a7d 100644 --- a/tests/test_dependencies_executor.py +++ b/tests/test_dependencies_executor.py @@ -1,13 +1,11 @@ from concurrent.futures import Future -import os import unittest from time import sleep from queue import Queue -from executorlib import Executor -from executorlib.interactive.create import create_executor +from executorlib import SingleNodeExecutor +from executorlib.interfaces.single import create_single_node_executor from executorlib.interactive.shared import execute_tasks_with_dependencies -from executorlib.standalone.plot import generate_nodes_and_edges from executorlib.standalone.serialize import cloudpickle_register from executorlib.standalone.thread import RaisingThread @@ -46,62 +44,12 @@ def raise_error(): class TestExecutorWithDependencies(unittest.TestCase): def test_executor(self): - with Executor(max_cores=1, backend="local") as exe: + with SingleNodeExecutor(max_cores=1) as exe: cloudpickle_register(ind=1) future_1 = exe.submit(add_function, 1, parameter_2=2) future_2 = exe.submit(add_function, 1, parameter_2=future_1) self.assertEqual(future_2.result(), 4) - @unittest.skipIf( - skip_graphviz_test, - "graphviz is not installed, so the plot_dependency_graph tests are skipped.", - ) - def test_executor_dependency_plot(self): - with Executor( - max_cores=1, - backend="local", - plot_dependency_graph=True, - ) as exe: - cloudpickle_register(ind=1) - future_1 = exe.submit(add_function, 1, parameter_2=2) - future_2 = exe.submit(add_function, 1, parameter_2=future_1) - self.assertTrue(future_1.done()) - self.assertTrue(future_2.done()) - self.assertEqual(len(exe._future_hash_dict), 2) - self.assertEqual(len(exe._task_hash_dict), 2) - nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, - future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() - }, - ) - self.assertEqual(len(nodes), 5) - self.assertEqual(len(edges), 4) - - @unittest.skipIf( - skip_graphviz_test, - "graphviz is not installed, so the plot_dependency_graph tests are skipped.", - ) - def test_executor_dependency_plot_filename(self): - graph_file = os.path.join(os.path.dirname(__file__), "test.png") - with Executor( - max_cores=1, - backend="local", - plot_dependency_graph=False, - plot_dependency_graph_filename=graph_file, - ) as exe: - cloudpickle_register(ind=1) - future_1 = exe.submit(add_function, 1, parameter_2=2) - future_2 = exe.submit(add_function, 1, parameter_2=future_1) - self.assertTrue(future_1.done()) - self.assertTrue(future_2.done()) - self.assertTrue(os.path.exists(graph_file)) - os.remove(graph_file) - - def test_create_executor_error(self): - with self.assertRaises(ValueError): - create_executor(backend="toast", resource_dict={"cores": 1}) - def test_dependency_steps(self): cloudpickle_register(ind=1) fs1 = Future() @@ -125,7 +73,7 @@ def test_dependency_steps(self): "resource_dict": {"cores": 1}, } ) - executor = create_executor( + executor = create_single_node_executor( max_workers=1, max_cores=2, resource_dict={ @@ -136,7 +84,6 @@ def test_dependency_steps(self): "openmpi_oversubscribe": False, "slurm_cmd_args": [], }, - backend="local", ) process = RaisingThread( target=execute_tasks_with_dependencies, @@ -158,7 +105,7 @@ def test_dependency_steps(self): def test_many_to_one(self): length = 5 parameter = 1 - with Executor(max_cores=2, backend="local") as exe: + with SingleNodeExecutor(max_cores=2) as exe: cloudpickle_register(ind=1) future_lst = exe.submit( generate_tasks, @@ -183,77 +130,28 @@ def test_many_to_one(self): ) self.assertEqual(future_sum.result(), 15) - @unittest.skipIf( - skip_graphviz_test, - "graphviz is not installed, so the plot_dependency_graph tests are skipped.", - ) - def test_many_to_one_plot(self): - length = 5 - parameter = 1 - with Executor( - max_cores=2, - backend="local", - plot_dependency_graph=True, - ) as exe: - cloudpickle_register(ind=1) - future_lst = exe.submit( - generate_tasks, - length=length, - resource_dict={"cores": 1}, - ) - lst = [] - for i in range(length): - lst.append( - exe.submit( - calc_from_lst, - lst=future_lst, - ind=i, - parameter=parameter, - resource_dict={"cores": 1}, - ) - ) - future_sum = exe.submit( - merge, - lst=lst, - resource_dict={"cores": 1}, - ) - self.assertTrue(future_lst.done()) - for l in lst: - self.assertTrue(l.done()) - self.assertTrue(future_sum.done()) - self.assertEqual(len(exe._future_hash_dict), 7) - self.assertEqual(len(exe._task_hash_dict), 7) - nodes, edges = generate_nodes_and_edges( - task_hash_dict=exe._task_hash_dict, - future_hash_inverse_dict={ - v: k for k, v in exe._future_hash_dict.items() - }, - ) - self.assertEqual(len(nodes), 18) - self.assertEqual(len(edges), 21) - class TestExecutorErrors(unittest.TestCase): def test_block_allocation_false_one_worker(self): with self.assertRaises(RuntimeError): - with Executor(max_cores=1, backend="local", block_allocation=False) as exe: + with SingleNodeExecutor(max_cores=1, block_allocation=False) as exe: cloudpickle_register(ind=1) _ = exe.submit(raise_error) def test_block_allocation_true_one_worker(self): with self.assertRaises(RuntimeError): - with Executor(max_cores=1, backend="local", block_allocation=True) as exe: + with SingleNodeExecutor(max_cores=1, block_allocation=True) as exe: cloudpickle_register(ind=1) _ = exe.submit(raise_error) def test_block_allocation_false_two_workers(self): with self.assertRaises(RuntimeError): - with Executor(max_cores=2, backend="local", block_allocation=False) as exe: + with SingleNodeExecutor(max_cores=2, block_allocation=False) as exe: cloudpickle_register(ind=1) _ = exe.submit(raise_error) def test_block_allocation_true_two_workers(self): with self.assertRaises(RuntimeError): - with Executor(max_cores=2, backend="local", block_allocation=True) as exe: + with SingleNodeExecutor(max_cores=2, block_allocation=True) as exe: cloudpickle_register(ind=1) _ = exe.submit(raise_error) diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index d94508e8..c11fcb3e 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -3,7 +3,7 @@ import numpy as np -from executorlib import Executor +from executorlib import FluxJobExecutor try: @@ -44,10 +44,9 @@ def setUp(self): self.executor = flux.job.FluxExecutor() def test_flux_executor_serial(self): - with Executor( + with FluxJobExecutor( max_cores=2, flux_executor=self.executor, - backend="flux_allocation", block_allocation=True, ) as exe: fs_1 = exe.submit(calc, 1) @@ -57,12 +56,25 @@ def test_flux_executor_serial(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) + def test_flux_executor_serial_no_depencies(self): + with FluxJobExecutor( + max_cores=2, + flux_executor=self.executor, + block_allocation=True, + disable_dependencies=True, + ) as exe: + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + def test_flux_executor_threads(self): - with Executor( + with FluxJobExecutor( max_cores=1, resource_dict={"threads_per_core": 2}, flux_executor=self.executor, - backend="flux_allocation", block_allocation=True, ) as exe: fs_1 = exe.submit(calc, 1) @@ -73,11 +85,10 @@ def test_flux_executor_threads(self): self.assertTrue(fs_2.done()) def test_flux_executor_parallel(self): - with Executor( + with FluxJobExecutor( max_cores=2, resource_dict={"cores": 2}, flux_executor=self.executor, - backend="flux_allocation", block_allocation=True, flux_executor_pmi_mode=pmi, ) as exe: @@ -86,11 +97,10 @@ def test_flux_executor_parallel(self): self.assertTrue(fs_1.done()) def test_single_task(self): - with Executor( + with FluxJobExecutor( max_cores=2, resource_dict={"cores": 2}, flux_executor=self.executor, - backend="flux_allocation", block_allocation=True, flux_executor_pmi_mode=pmi, ) as p: @@ -105,11 +115,10 @@ def test_output_files_cwd(self): os.makedirs(dirname, exist_ok=True) file_stdout = os.path.join(dirname, "flux.out") file_stderr = os.path.join(dirname, "flux.err") - with Executor( + with FluxJobExecutor( max_cores=1, resource_dict={"cores": 1, "cwd": dirname}, flux_executor=self.executor, - backend="flux_allocation", block_allocation=True, flux_log_files=True, ) as p: @@ -126,11 +135,10 @@ def test_output_files_cwd(self): def test_output_files_abs(self): file_stdout = os.path.abspath("flux.out") file_stderr = os.path.abspath("flux.err") - with Executor( + with FluxJobExecutor( max_cores=1, resource_dict={"cores": 1}, flux_executor=self.executor, - backend="flux_allocation", block_allocation=True, flux_log_files=True, ) as p: @@ -145,12 +153,11 @@ def test_output_files_abs(self): os.remove(file_stderr) def test_internal_memory(self): - with Executor( + with FluxJobExecutor( max_cores=1, resource_dict={"cores": 1}, init_function=set_global, flux_executor=self.executor, - backend="flux_allocation", block_allocation=True, ) as p: f = p.submit(get_global) @@ -160,10 +167,9 @@ def test_internal_memory(self): def test_validate_max_workers(self): with self.assertRaises(ValueError): - Executor( + FluxJobExecutor( max_workers=10, resource_dict={"cores": 10, "threads_per_core": 10}, flux_executor=self.executor, - backend="flux_allocation", block_allocation=True, ) diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index e0b4a3c5..ca7abe39 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -4,7 +4,7 @@ import time import unittest -from executorlib import Executor +from executorlib import SingleNodeExecutor, SlurmJobExecutor from executorlib.standalone.serialize import cloudpickle_register @@ -34,7 +34,7 @@ def mpi_funct_sleep(i): class TestExecutorBackend(unittest.TestCase): def test_meta_executor_serial(self): - with Executor(max_cores=2, backend="local", block_allocation=True) as exe: + with SingleNodeExecutor(max_cores=2, block_allocation=True) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -44,7 +44,7 @@ def test_meta_executor_serial(self): self.assertTrue(fs_2.done()) def test_meta_executor_single(self): - with Executor(max_cores=1, backend="local", block_allocation=True) as exe: + with SingleNodeExecutor(max_cores=1, block_allocation=True) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -55,7 +55,7 @@ def test_meta_executor_single(self): def test_oversubscribe(self): with self.assertRaises(ValueError): - with Executor(max_cores=1, backend="local", block_allocation=True) as exe: + with SingleNodeExecutor(max_cores=1, block_allocation=True) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1, resource_dict={"cores": 2}) @@ -63,10 +63,9 @@ def test_oversubscribe(self): skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." ) def test_meta_executor_parallel(self): - with Executor( + with SingleNodeExecutor( max_workers=2, resource_dict={"cores": 2}, - backend="local", block_allocation=True, ) as exe: cloudpickle_register(ind=1) @@ -76,10 +75,9 @@ def test_meta_executor_parallel(self): def test_errors(self): with self.assertRaises(TypeError): - Executor( + SingleNodeExecutor( max_cores=1, resource_dict={"cores": 1, "gpus_per_core": 1}, - backend="local", ) @@ -91,10 +89,9 @@ def tearDown(self): skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." ) def test_meta_executor_parallel_cache(self): - with Executor( + with SingleNodeExecutor( max_workers=2, resource_dict={"cores": 2}, - backend="local", block_allocation=True, cache_directory="./cache", ) as exe: @@ -117,10 +114,9 @@ class TestWorkingDirectory(unittest.TestCase): def test_output_files_cwd(self): dirname = os.path.abspath(os.path.dirname(__file__)) os.makedirs(dirname, exist_ok=True) - with Executor( + with SingleNodeExecutor( max_cores=1, resource_dict={"cores": 1, "cwd": dirname}, - backend="local", block_allocation=True, ) as p: output = p.map(calc, [1, 2, 3]) @@ -135,9 +131,8 @@ def test_validate_max_workers(self): os.environ["SLURM_NTASKS"] = "6" os.environ["SLURM_CPUS_PER_TASK"] = "4" with self.assertRaises(ValueError): - Executor( + SlurmJobExecutor( max_workers=10, resource_dict={"cores": 10, "threads_per_core": 10}, - backend="slurm_allocation", block_allocation=True, ) diff --git a/tests/test_executor_backend_mpi_noblock.py b/tests/test_executor_backend_mpi_noblock.py index 47ea2bb3..03f21ef6 100644 --- a/tests/test_executor_backend_mpi_noblock.py +++ b/tests/test_executor_backend_mpi_noblock.py @@ -1,6 +1,6 @@ import unittest -from executorlib import Executor +from executorlib import SingleNodeExecutor from executorlib.standalone.serialize import cloudpickle_register @@ -14,9 +14,8 @@ def resource_dict(resource_dict): class TestExecutorBackend(unittest.TestCase): def test_meta_executor_serial_with_dependencies(self): - with Executor( + with SingleNodeExecutor( max_cores=2, - backend="local", block_allocation=False, disable_dependencies=True, ) as exe: @@ -29,9 +28,8 @@ def test_meta_executor_serial_with_dependencies(self): self.assertTrue(fs_2.done()) def test_meta_executor_serial_without_dependencies(self): - with Executor( + with SingleNodeExecutor( max_cores=2, - backend="local", block_allocation=False, disable_dependencies=False, ) as exe: @@ -44,9 +42,8 @@ def test_meta_executor_serial_without_dependencies(self): self.assertTrue(fs_2.done()) def test_meta_executor_single(self): - with Executor( + with SingleNodeExecutor( max_cores=1, - backend="local", block_allocation=False, ) as exe: cloudpickle_register(ind=1) @@ -59,25 +56,22 @@ def test_meta_executor_single(self): def test_errors(self): with self.assertRaises(TypeError): - Executor( + SingleNodeExecutor( max_cores=1, resource_dict={ "cores": 1, "gpus_per_core": 1, }, - backend="local", ) with self.assertRaises(ValueError): - with Executor( + with SingleNodeExecutor( max_cores=1, - backend="local", block_allocation=False, ) as exe: exe.submit(resource_dict, resource_dict={}) with self.assertRaises(ValueError): - with Executor( + with SingleNodeExecutor( max_cores=1, - backend="local", block_allocation=True, ) as exe: exe.submit(resource_dict, resource_dict={}) diff --git a/tests/test_integration_pyiron_workflow.py b/tests/test_integration_pyiron_workflow.py index 5eb4ba86..f1f8505f 100644 --- a/tests/test_integration_pyiron_workflow.py +++ b/tests/test_integration_pyiron_workflow.py @@ -12,7 +12,7 @@ from typing import Callable import unittest -from executorlib import Executor +from executorlib import SingleNodeExecutor from executorlib.standalone.serialize import cloudpickle_register @@ -75,7 +75,7 @@ def slowly_returns_dynamic(dynamic_arg): return dynamic_arg dynamic_dynamic = slowly_returns_dynamic() - executor = Executor(block_allocation=True, max_workers=1) + executor = SingleNodeExecutor(block_allocation=True, max_workers=1) cloudpickle_register(ind=1) dynamic_object = does_nothing() fs = executor.submit(dynamic_dynamic.run, dynamic_object) @@ -105,7 +105,7 @@ def slowly_returns_42(): self.assertIsNone( dynamic_42.result, msg="Just a sanity check that the test is set up right" ) - executor = Executor(block_allocation=True, max_workers=1) + executor = SingleNodeExecutor(block_allocation=True, max_workers=1) cloudpickle_register(ind=1) fs = executor.submit(dynamic_42.run) fs.add_done_callback(dynamic_42.process_result) @@ -136,7 +136,7 @@ def returns_42(): dynamic_42.running, msg="Sanity check that the test starts in the expected condition", ) - executor = Executor(block_allocation=True, max_workers=1) + executor = SingleNodeExecutor(block_allocation=True, max_workers=1) cloudpickle_register(ind=1) fs = executor.submit(dynamic_42.run) fs.add_done_callback(dynamic_42.process_result) @@ -160,7 +160,7 @@ def raise_error(): raise RuntimeError re = raise_error() - executor = Executor(block_allocation=True, max_workers=1) + executor = SingleNodeExecutor(block_allocation=True, max_workers=1) cloudpickle_register(ind=1) fs = executor.submit(re.run) with self.assertRaises( @@ -190,7 +190,7 @@ def slowly_returns_dynamic(): return inside_variable dynamic_dynamic = slowly_returns_dynamic() - executor = Executor(block_allocation=True, max_workers=1) + executor = SingleNodeExecutor(block_allocation=True, max_workers=1) cloudpickle_register(ind=1) fs = executor.submit(dynamic_dynamic.run) self.assertIsInstance( @@ -219,7 +219,7 @@ def slow(): return fortytwo f = slow() - executor = Executor(block_allocation=True, max_workers=1) + executor = SingleNodeExecutor(block_allocation=True, max_workers=1) cloudpickle_register(ind=1) fs = executor.submit(f.run) self.assertEqual( diff --git a/tests/test_plot_dependency.py b/tests/test_plot_dependency.py new file mode 100644 index 00000000..3b79f960 --- /dev/null +++ b/tests/test_plot_dependency.py @@ -0,0 +1,270 @@ +import os +import unittest +from time import sleep + +from executorlib import ( + SingleNodeExecutor, + SlurmJobExecutor, + SlurmClusterExecutor, +) +from executorlib.standalone.plot import generate_nodes_and_edges +from executorlib.standalone.serialize import cloudpickle_register + + +try: + import pygraphviz + + skip_graphviz_test = False +except ImportError: + skip_graphviz_test = True + + +def add_function(parameter_1, parameter_2): + sleep(0.2) + return parameter_1 + parameter_2 + + +def generate_tasks(length): + sleep(0.2) + return range(length) + + +def calc_from_lst(lst, ind, parameter): + sleep(0.2) + return lst[ind] + parameter + + +def merge(lst): + sleep(0.2) + return sum(lst) + + +@unittest.skipIf( + skip_graphviz_test, + "graphviz is not installed, so the plot_dependency_graph tests are skipped.", +) +class TestLocalExecutorWithDependencies(unittest.TestCase): + def test_executor_dependency_plot(self): + with SingleNodeExecutor( + max_cores=1, + plot_dependency_graph=True, + block_allocation=True, + ) as exe: + cloudpickle_register(ind=1) + future_1 = exe.submit(add_function, 1, parameter_2=2) + future_2 = exe.submit(add_function, 1, parameter_2=future_1) + self.assertTrue(future_1.done()) + self.assertTrue(future_2.done()) + self.assertEqual(len(exe._future_hash_dict), 2) + self.assertEqual(len(exe._task_hash_dict), 2) + nodes, edges = generate_nodes_and_edges( + task_hash_dict=exe._task_hash_dict, + future_hash_inverse_dict={ + v: k for k, v in exe._future_hash_dict.items() + }, + ) + self.assertEqual(len(nodes), 5) + self.assertEqual(len(edges), 4) + + def test_executor_dependency_plot_filename(self): + graph_file = os.path.join(os.path.dirname(__file__), "test.png") + with SingleNodeExecutor( + max_cores=1, + block_allocation=False, + plot_dependency_graph=False, + plot_dependency_graph_filename=graph_file, + ) as exe: + cloudpickle_register(ind=1) + future_1 = exe.submit(add_function, 1, parameter_2=2) + future_2 = exe.submit(add_function, 1, parameter_2=future_1) + self.assertTrue(future_1.done()) + self.assertTrue(future_2.done()) + self.assertTrue(os.path.exists(graph_file)) + os.remove(graph_file) + + def test_many_to_one_plot(self): + length = 5 + parameter = 1 + with SingleNodeExecutor( + max_cores=2, + block_allocation=False, + plot_dependency_graph=True, + ) as exe: + cloudpickle_register(ind=1) + future_lst = exe.submit( + generate_tasks, + length=length, + resource_dict={"cores": 1}, + ) + lst = [] + for i in range(length): + lst.append( + exe.submit( + calc_from_lst, + lst=future_lst, + ind=i, + parameter=parameter, + resource_dict={"cores": 1}, + ) + ) + future_sum = exe.submit( + merge, + lst=lst, + resource_dict={"cores": 1}, + ) + self.assertTrue(future_lst.done()) + for l in lst: + self.assertTrue(l.done()) + self.assertTrue(future_sum.done()) + self.assertEqual(len(exe._future_hash_dict), 7) + self.assertEqual(len(exe._task_hash_dict), 7) + nodes, edges = generate_nodes_and_edges( + task_hash_dict=exe._task_hash_dict, + future_hash_inverse_dict={ + v: k for k, v in exe._future_hash_dict.items() + }, + ) + self.assertEqual(len(nodes), 18) + self.assertEqual(len(edges), 21) + + +@unittest.skipIf( + skip_graphviz_test, + "graphviz is not installed, so the plot_dependency_graph tests are skipped.", +) +class TestSlurmAllocationExecutorWithDependencies(unittest.TestCase): + def test_executor_dependency_plot(self): + with SlurmJobExecutor( + max_cores=1, + block_allocation=False, + plot_dependency_graph=True, + ) as exe: + cloudpickle_register(ind=1) + future_1 = exe.submit(add_function, 1, parameter_2=2) + future_2 = exe.submit(add_function, 1, parameter_2=future_1) + self.assertTrue(future_1.done()) + self.assertTrue(future_2.done()) + self.assertEqual(len(exe._future_hash_dict), 2) + self.assertEqual(len(exe._task_hash_dict), 2) + nodes, edges = generate_nodes_and_edges( + task_hash_dict=exe._task_hash_dict, + future_hash_inverse_dict={ + v: k for k, v in exe._future_hash_dict.items() + }, + ) + self.assertEqual(len(nodes), 5) + self.assertEqual(len(edges), 4) + + def test_many_to_one_plot(self): + length = 5 + parameter = 1 + with SlurmJobExecutor( + max_cores=2, + block_allocation=False, + plot_dependency_graph=True, + ) as exe: + cloudpickle_register(ind=1) + future_lst = exe.submit( + generate_tasks, + length=length, + resource_dict={"cores": 1}, + ) + lst = [] + for i in range(length): + lst.append( + exe.submit( + calc_from_lst, + lst=future_lst, + ind=i, + parameter=parameter, + resource_dict={"cores": 1}, + ) + ) + future_sum = exe.submit( + merge, + lst=lst, + resource_dict={"cores": 1}, + ) + self.assertTrue(future_lst.done()) + for l in lst: + self.assertTrue(l.done()) + self.assertTrue(future_sum.done()) + self.assertEqual(len(exe._future_hash_dict), 7) + self.assertEqual(len(exe._task_hash_dict), 7) + nodes, edges = generate_nodes_and_edges( + task_hash_dict=exe._task_hash_dict, + future_hash_inverse_dict={ + v: k for k, v in exe._future_hash_dict.items() + }, + ) + self.assertEqual(len(nodes), 18) + self.assertEqual(len(edges), 21) + + +@unittest.skipIf( + skip_graphviz_test, + "graphviz is not installed, so the plot_dependency_graph tests are skipped.", +) +class TestSlurmSubmissionExecutorWithDependencies(unittest.TestCase): + def test_executor_dependency_plot(self): + with SlurmClusterExecutor( + plot_dependency_graph=True, + ) as exe: + cloudpickle_register(ind=1) + future_1 = exe.submit(add_function, 1, parameter_2=2) + future_2 = exe.submit(add_function, 1, parameter_2=future_1) + self.assertTrue(future_1.done()) + self.assertTrue(future_2.done()) + self.assertEqual(len(exe._future_hash_dict), 2) + self.assertEqual(len(exe._task_hash_dict), 2) + nodes, edges = generate_nodes_and_edges( + task_hash_dict=exe._task_hash_dict, + future_hash_inverse_dict={ + v: k for k, v in exe._future_hash_dict.items() + }, + ) + self.assertEqual(len(nodes), 5) + self.assertEqual(len(edges), 4) + + def test_many_to_one_plot(self): + length = 5 + parameter = 1 + with SlurmClusterExecutor( + plot_dependency_graph=True, + ) as exe: + cloudpickle_register(ind=1) + future_lst = exe.submit( + generate_tasks, + length=length, + resource_dict={"cores": 1}, + ) + lst = [] + for i in range(length): + lst.append( + exe.submit( + calc_from_lst, + lst=future_lst, + ind=i, + parameter=parameter, + resource_dict={"cores": 1}, + ) + ) + future_sum = exe.submit( + merge, + lst=lst, + resource_dict={"cores": 1}, + ) + self.assertTrue(future_lst.done()) + for l in lst: + self.assertTrue(l.done()) + self.assertTrue(future_sum.done()) + self.assertEqual(len(exe._future_hash_dict), 7) + self.assertEqual(len(exe._task_hash_dict), 7) + nodes, edges = generate_nodes_and_edges( + task_hash_dict=exe._task_hash_dict, + future_hash_inverse_dict={ + v: k for k, v in exe._future_hash_dict.items() + }, + ) + self.assertEqual(len(nodes), 18) + self.assertEqual(len(edges), 21) diff --git a/tests/test_plot_dependency_flux.py b/tests/test_plot_dependency_flux.py new file mode 100644 index 00000000..75c06f2b --- /dev/null +++ b/tests/test_plot_dependency_flux.py @@ -0,0 +1,179 @@ +import os +import unittest +from time import sleep + +from executorlib import FluxJobExecutor, FluxClusterExecutor +from executorlib.standalone.plot import generate_nodes_and_edges +from executorlib.standalone.serialize import cloudpickle_register + + +try: + import pygraphviz + import flux.job + from executorlib.interactive.flux import FluxPythonSpawner + + skip_graphviz_flux_test = "FLUX_URI" not in os.environ +except ImportError: + skip_graphviz_flux_test = True + + +def add_function(parameter_1, parameter_2): + sleep(0.2) + return parameter_1 + parameter_2 + + +def generate_tasks(length): + sleep(0.2) + return range(length) + + +def calc_from_lst(lst, ind, parameter): + sleep(0.2) + return lst[ind] + parameter + + +def merge(lst): + sleep(0.2) + return sum(lst) + + +@unittest.skipIf( + skip_graphviz_flux_test, + "Either graphviz or flux are not installed, so the plot_dependency_graph tests are skipped.", +) +class TestFluxAllocationExecutorWithDependencies(unittest.TestCase): + def test_executor_dependency_plot(self): + with FluxJobExecutor( + max_cores=1, + plot_dependency_graph=True, + block_allocation=False, + ) as exe: + cloudpickle_register(ind=1) + future_1 = exe.submit(add_function, 1, parameter_2=2) + future_2 = exe.submit(add_function, 1, parameter_2=future_1) + self.assertTrue(future_1.done()) + self.assertTrue(future_2.done()) + self.assertEqual(len(exe._future_hash_dict), 2) + self.assertEqual(len(exe._task_hash_dict), 2) + nodes, edges = generate_nodes_and_edges( + task_hash_dict=exe._task_hash_dict, + future_hash_inverse_dict={ + v: k for k, v in exe._future_hash_dict.items() + }, + ) + self.assertEqual(len(nodes), 5) + self.assertEqual(len(edges), 4) + + def test_many_to_one_plot(self): + length = 5 + parameter = 1 + with FluxJobExecutor( + max_cores=2, + plot_dependency_graph=True, + block_allocation=True, + ) as exe: + cloudpickle_register(ind=1) + future_lst = exe.submit( + generate_tasks, + length=length, + resource_dict={"cores": 1}, + ) + lst = [] + for i in range(length): + lst.append( + exe.submit( + calc_from_lst, + lst=future_lst, + ind=i, + parameter=parameter, + resource_dict={"cores": 1}, + ) + ) + future_sum = exe.submit( + merge, + lst=lst, + resource_dict={"cores": 1}, + ) + self.assertTrue(future_lst.done()) + for l in lst: + self.assertTrue(l.done()) + self.assertTrue(future_sum.done()) + self.assertEqual(len(exe._future_hash_dict), 7) + self.assertEqual(len(exe._task_hash_dict), 7) + nodes, edges = generate_nodes_and_edges( + task_hash_dict=exe._task_hash_dict, + future_hash_inverse_dict={ + v: k for k, v in exe._future_hash_dict.items() + }, + ) + self.assertEqual(len(nodes), 18) + self.assertEqual(len(edges), 21) + + +@unittest.skipIf( + skip_graphviz_flux_test, + "Either graphviz or flux are not installed, so the plot_dependency_graph tests are skipped.", +) +class TestFluxSubmissionExecutorWithDependencies(unittest.TestCase): + def test_executor_dependency_plot(self): + with FluxClusterExecutor( + plot_dependency_graph=True, + ) as exe: + cloudpickle_register(ind=1) + future_1 = exe.submit(add_function, 1, parameter_2=2) + future_2 = exe.submit(add_function, 1, parameter_2=future_1) + self.assertTrue(future_1.done()) + self.assertTrue(future_2.done()) + self.assertEqual(len(exe._future_hash_dict), 2) + self.assertEqual(len(exe._task_hash_dict), 2) + nodes, edges = generate_nodes_and_edges( + task_hash_dict=exe._task_hash_dict, + future_hash_inverse_dict={ + v: k for k, v in exe._future_hash_dict.items() + }, + ) + self.assertEqual(len(nodes), 5) + self.assertEqual(len(edges), 4) + + def test_many_to_one_plot(self): + length = 5 + parameter = 1 + with FluxClusterExecutor( + plot_dependency_graph=True, + ) as exe: + cloudpickle_register(ind=1) + future_lst = exe.submit( + generate_tasks, + length=length, + resource_dict={"cores": 1}, + ) + lst = [] + for i in range(length): + lst.append( + exe.submit( + calc_from_lst, + lst=future_lst, + ind=i, + parameter=parameter, + resource_dict={"cores": 1}, + ) + ) + future_sum = exe.submit( + merge, + lst=lst, + resource_dict={"cores": 1}, + ) + self.assertTrue(future_lst.done()) + for l in lst: + self.assertTrue(l.done()) + self.assertTrue(future_sum.done()) + self.assertEqual(len(exe._future_hash_dict), 7) + self.assertEqual(len(exe._task_hash_dict), 7) + nodes, edges = generate_nodes_and_edges( + task_hash_dict=exe._task_hash_dict, + future_hash_inverse_dict={ + v: k for k, v in exe._future_hash_dict.items() + }, + ) + self.assertEqual(len(nodes), 18) + self.assertEqual(len(edges), 21) diff --git a/tests/test_shell_executor.py b/tests/test_shell_executor.py index dbdb9aea..971befd0 100644 --- a/tests/test_shell_executor.py +++ b/tests/test_shell_executor.py @@ -3,7 +3,7 @@ import queue import unittest -from executorlib import Executor +from executorlib import SingleNodeExecutor from executorlib.standalone.serialize import cloudpickle_register from executorlib.interactive.shared import execute_parallel_tasks from executorlib.standalone.interactive.spawner import MpiExecSpawner @@ -83,7 +83,7 @@ def test_broken_executable(self): ) def test_shell_static_executor_args(self): - with Executor(max_workers=1) as exe: + with SingleNodeExecutor(max_workers=1) as exe: cloudpickle_register(ind=1) future = exe.submit( submit_shell_command, @@ -96,7 +96,7 @@ def test_shell_static_executor_args(self): self.assertTrue(future.done()) def test_shell_static_executor_binary(self): - with Executor(max_workers=1) as exe: + with SingleNodeExecutor(max_workers=1) as exe: cloudpickle_register(ind=1) future = exe.submit( submit_shell_command, @@ -109,7 +109,7 @@ def test_shell_static_executor_binary(self): self.assertTrue(future.done()) def test_shell_static_executor_shell(self): - with Executor(max_workers=1) as exe: + with SingleNodeExecutor(max_workers=1) as exe: cloudpickle_register(ind=1) future = exe.submit( submit_shell_command, "echo test", universal_newlines=True, shell=True @@ -119,7 +119,7 @@ def test_shell_static_executor_shell(self): self.assertTrue(future.done()) def test_shell_executor(self): - with Executor(max_workers=2) as exe: + with SingleNodeExecutor(max_workers=2) as exe: cloudpickle_register(ind=1) f_1 = exe.submit( submit_shell_command, ["echo", "test_1"], universal_newlines=True diff --git a/tests/test_shell_interactive.py b/tests/test_shell_interactive.py index e211b559..553639af 100644 --- a/tests/test_shell_interactive.py +++ b/tests/test_shell_interactive.py @@ -4,7 +4,7 @@ import queue import unittest -from executorlib import Executor +from executorlib import SingleNodeExecutor from executorlib.standalone.serialize import cloudpickle_register from executorlib.interactive.shared import execute_parallel_tasks from executorlib.standalone.interactive.spawner import MpiExecSpawner @@ -104,7 +104,7 @@ def test_execute_single_task(self): def test_shell_interactive_executor(self): cloudpickle_register(ind=1) - with Executor( + with SingleNodeExecutor( max_workers=1, init_function=init_process, block_allocation=True,