Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[major] Refactor the Executor interface #548

Merged
merged 33 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
97a0920
[major] Refactor the Executor interface
jan-janssen Feb 1, 2025
07c27a7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 1, 2025
fc0298c
fix test
jan-janssen Feb 1, 2025
f3b8616
update benchmark
jan-janssen Feb 1, 2025
0b38e5f
move interfaces to individual modules
jan-janssen Feb 1, 2025
41ca2ad
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 1, 2025
9f61c6e
Merge remote-tracking branch 'origin/main' into interface
jan-janssen Feb 1, 2025
4e903bc
fixes
jan-janssen Feb 1, 2025
7c19964
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 1, 2025
2e09bdc
Merge remote-tracking branch 'origin/main' into interface
jan-janssen Feb 1, 2025
1fb1859
remove create
jan-janssen Feb 1, 2025
57c5242
Merge remote-tracking branch 'origin/interface' into interface
jan-janssen Feb 1, 2025
4f4516b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 1, 2025
454b336
remove redundant functionality
jan-janssen Feb 1, 2025
b89bca7
Move graph tests
jan-janssen Feb 1, 2025
89240d2
remove core check
jan-janssen Feb 1, 2025
8f01dfa
clean up
jan-janssen Feb 1, 2025
d9fcc61
more tests
jan-janssen Feb 1, 2025
28185b2
fix import
jan-janssen Feb 1, 2025
685c5dc
more fixes
jan-janssen Feb 1, 2025
672196e
Rename LocalExecutor to SingleNodeExecutor
jan-janssen Feb 1, 2025
5072318
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 1, 2025
8a37e60
test block allocation
jan-janssen Feb 1, 2025
d261621
Merge remote-tracking branch 'origin/interface' into interface
jan-janssen Feb 1, 2025
b7d5a8c
fix
jan-janssen Feb 1, 2025
a0bfc9f
flux fix
jan-janssen Feb 1, 2025
abeac75
Rename interfaces
jan-janssen Feb 2, 2025
814c0d2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 2, 2025
acf4693
fix docstrings
jan-janssen Feb 2, 2025
0fe7dfe
add docstrings to create_*_executor() function
jan-janssen Feb 3, 2025
87b3380
fix test
jan-janssen Feb 3, 2025
9fd0ec3
Merge remote-tracking branch 'origin/main' into interface
jan-janssen Feb 3, 2025
0cf4b6a
merge
jan-janssen Feb 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 6 additions & 243 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,248 +1,11 @@
from typing import Callable, Optional

from executorlib._version import get_versions as _get_versions
from executorlib.interactive.executor import (
ExecutorWithDependencies as _ExecutorWithDependencies,
)
from executorlib.interactive.executor import create_executor as _create_executor
from executorlib.standalone.inputcheck import (
check_plot_dependency_graph as _check_plot_dependency_graph,
)
from executorlib.standalone.inputcheck import (
check_pysqa_config_directory as _check_pysqa_config_directory,
)
from executorlib.standalone.inputcheck import (
check_refresh_rate as _check_refresh_rate,
from executorlib.interfaces import (
FluxAllocationExecutor,
FluxSubmissionExecutor,
LocalExecutor,
SlurmAllocationExecutor,
SlurmSubmissionExecutor,
)

__version__ = _get_versions()["version"]
__all__: list = []


class Executor:
"""
The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or
preferable the flux framework for distributing python functions within a given resource allocation. In contrast to
the mpi4py.futures.MPIPoolExecutor the executorlib.Executor can be executed in a serial python process and does not
require the python script to be executed with MPI. It is even possible to execute the executorlib.Executor directly
in an interactive Jupyter notebook.

Args:
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
recommended, as computers have a limited number of compute cores.
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
max_cores (int): defines the number cores which can be used in parallel
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
- threads_per_core (int): number of OpenMP threads to be used for each function call
- gpus_per_core (int): number of GPUs per worker - defaults to 0
- cwd (str/None): current working directory where the parallel python task is executed
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
SLURM only) - default False
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource
requirements, executorlib supports block allocation. In this case all resources have
to be defined on the executor, rather than during the submission of the individual
function.
init_function (None): optional function to preset arguments for functions which are submitted later
disable_dependencies (boolean): Disable resolving future objects during the submission.
refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.

Examples:
```
>>> import numpy as np
>>> from executorlib import Executor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with Executor(cores=2, init_function=init_k) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
```
"""

def __init__(
self,
max_workers: Optional[int] = None,
backend: str = "local",
cache_directory: Optional[str] = None,
max_cores: Optional[int] = None,
resource_dict: Optional[dict] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[Callable] = None,
disable_dependencies: bool = False,
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
):
# Use __new__() instead of __init__(). This function is only implemented to enable auto-completion.
pass

def __new__(
cls,
max_workers: Optional[int] = None,
backend: str = "local",
cache_directory: Optional[str] = None,
max_cores: Optional[int] = None,
resource_dict: Optional[dict] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[Callable] = None,
disable_dependencies: bool = False,
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
):
"""
Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor,
executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The
executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used
for development and testing. The executorlib.flux.PyFluxExecutor requires flux-core from the flux-framework to be
installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor
requires the SLURM workload manager to be installed on the system.

Args:
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the
number of cores which can be used in parallel - just like the max_cores parameter. Using
max_cores is recommended, as computers have a limited number of compute cores.
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
max_cores (int): defines the number cores which can be used in parallel
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
- threads_per_core (int): number of OpenMP threads to be used for each function call
- gpus_per_core (int): number of GPUs per worker - defaults to 0
- cwd (str/None): current working directory where the parallel python task is executed
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI
and SLURM only) - default False
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
only)
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
block_allocation (boolean): To accelerate the submission of a series of python functions with the same
resource requirements, executorlib supports block allocation. In this case all
resources have to be defined on the executor, rather than during the submission
of the individual function.
init_function (None): optional function to preset arguments for functions which are submitted later
disable_dependencies (boolean): Disable resolving future objects during the submission.
refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.

"""
default_resource_dict: dict = {
"cores": 1,
"threads_per_core": 1,
"gpus_per_core": 0,
"cwd": None,
"openmpi_oversubscribe": False,
"slurm_cmd_args": [],
}
if resource_dict is None:
resource_dict = {}
resource_dict.update(
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
)
if "_submission" in backend and not plot_dependency_graph:
from executorlib.cache.executor import create_file_executor

return create_file_executor(
max_workers=max_workers,
backend=backend,
max_cores=max_cores,
cache_directory=cache_directory,
resource_dict=resource_dict,
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
flux_log_files=flux_log_files,
pysqa_config_directory=pysqa_config_directory,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
disable_dependencies=disable_dependencies,
)
elif not disable_dependencies:
_check_pysqa_config_directory(pysqa_config_directory=pysqa_config_directory)
return _ExecutorWithDependencies(
max_workers=max_workers,
backend=backend,
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
flux_log_files=flux_log_files,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
refresh_rate=refresh_rate,
plot_dependency_graph=plot_dependency_graph,
plot_dependency_graph_filename=plot_dependency_graph_filename,
)
else:
_check_pysqa_config_directory(pysqa_config_directory=pysqa_config_directory)
_check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph)
_check_refresh_rate(refresh_rate=refresh_rate)
return _create_executor(
max_workers=max_workers,
backend=backend,
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
flux_log_files=flux_log_files,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
)
Loading
Loading