diff --git a/.ci_support/build_notebooks.sh b/.ci_support/build_notebooks.sh new file mode 100755 index 00000000..08c14214 --- /dev/null +++ b/.ci_support/build_notebooks.sh @@ -0,0 +1,11 @@ +#!/bin/bash +# execute notebooks +i=0; +for notebook in $(ls notebooks/*.ipynb); do + papermill ${notebook} ${notebook%.*}-out.${notebook##*.} -k python3 || i=$((i+1)); +done; + +# push error to next level +if [ $i -gt 0 ]; then + exit 1; +fi; \ No newline at end of file diff --git a/.github/workflows/notebooks.yml b/.github/workflows/notebooks.yml index f4d28f53..9cfb0934 100644 --- a/.github/workflows/notebooks.yml +++ b/.github/workflows/notebooks.yml @@ -34,4 +34,4 @@ jobs: timeout-minutes: 5 run: > flux start - papermill notebooks/examples.ipynb examples-out.ipynb -k "python3" + .ci_support/build_notebooks.sh diff --git a/README.md b/README.md index f432ccfa..200e0f4e 100644 --- a/README.md +++ b/README.md @@ -3,111 +3,122 @@ [![Coverage Status](https://coveralls.io/repos/github/pyiron/executorlib/badge.svg?branch=main)](https://coveralls.io/github/pyiron/executorlib?branch=main) [![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/pyiron/executorlib/HEAD?labpath=notebooks%2Fexamples.ipynb) -## Challenges -In high performance computing (HPC) the Python programming language is commonly used as high-level language to -orchestrate the coupling of scientific applications. Still the efficient usage of highly parallel HPC clusters remains -challenging, in primarily three aspects: - -* **Communication**: Distributing python function calls over hundreds of compute node and gathering the results on a - shared file system is technically possible, but highly inefficient. A socket-based communication approach is - preferable. -* **Resource Management**: Assigning Python functions to GPUs or executing Python functions on multiple CPUs using the - message passing interface (MPI) requires major modifications to the python workflow. -* **Integration**: Existing workflow libraries implement a secondary the job management on the Python level rather than - leveraging the existing infrastructure provided by the job scheduler of the HPC. - -### executorlib is ... -In a given HPC allocation the `executorlib` library addresses these challenges by extending the Executor interface -of the standard Python library to support the resource assignment in the HPC context. Computing resources can either be -assigned on a per function call basis or as a block allocation on a per Executor basis. The `executorlib` library -is built on top of the [flux-framework](https://flux-framework.org) to enable fine-grained resource assignment. In -addition, [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com) is supported as alternative -queuing system and for workstation installations `executorlib` can be installed without a job scheduler. - -### executorlib is not ... -The executorlib library is not designed to request an allocation from the job scheduler of an HPC. Instead within a given -allocation from the job scheduler the `executorlib` library can be employed to distribute a series of python -function calls over the available computing resources to achieve maximum computing resource utilization. - -## Example -The following examples illustrates how `executorlib` can be used to distribute a series of MPI parallel function calls -within a queuing system allocation. `example.py`: +Up-scale python functions for high performance computing (HPC) with executorlib. + +## Key Features +* **Up-scale your Python functions beyond a single computer.** - executorlib extends the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#executor-objects) + from the Python standard library and combines it with job schedulers for high performance computing (HPC) including + the [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com) and [flux](http://flux-framework.org). + With this combination executorlib allows users to distribute their Python functions over multiple compute nodes. +* **Parallelize your Python program one function at a time** - executorlib allows users to assign dedicated computing + resources like CPU cores, threads or GPUs to one Python function call at a time. So you can accelerate your Python + code function by function. +* **Permanent caching of intermediate results to accelerate rapid prototyping** - To accelerate the development of + machine learning pipelines and simulation workflows executorlib provides optional caching of intermediate results for + iterative development in interactive environments like jupyter notebooks. + +## Examples +The Python standard library provides the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#executor-objects) +with the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) and the +[ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) for parallel +execution of Python functions on a single computer. executorlib extends this functionality to distribute Python +functions over multiple computers within a high performance computing (HPC) cluster. This can be either achieved by +submitting each function as individual job to the HPC job scheduler - [HPC Submission Mode]() - or by requesting a +compute allocation of multiple nodes and then distribute the Python functions within this allocation - [HPC Allocation Mode](). +Finally, to accelerate the development process executorlib also provides a - [Local Mode]() - to use the executorlib +functionality on a single workstation for testing. Starting with the [Local Mode]() set by setting the backend parameter +to local - `backend="local"`: ```python -import flux.job from executorlib import Executor + +with Executor(backend="local") as exe: + future_lst = [exe.submit(sum, [i, i]) for i in range(1, 5)] + print([f.result() for f in future_lst]) +``` +In the same way executorlib can also execute Python functions which use additional computing resources, like multiple +CPU cores, CPU threads or GPUs. For example if the Python function internally uses the Message Passing Interface (MPI) +via the [mpi4py](https://mpi4py.readthedocs.io) Python libary: +```python +from executorlib import Executor + + def calc(i): from mpi4py import MPI + size = MPI.COMM_WORLD.Get_size() rank = MPI.COMM_WORLD.Get_rank() return i, size, rank -with flux.job.FluxExecutor() as flux_exe: - with Executor(max_cores=2, executor=flux_exe, resource_dict={"cores": 2}) as exe: - fs = exe.submit(calc, 3) - print(fs.result()) -``` -This example can be executed using: -``` -python example.py -``` -Which returns: -``` ->>> [(0, 2, 0), (0, 2, 1)], [(1, 2, 0), (1, 2, 1)] -``` -The important part in this example is that [mpi4py](https://mpi4py.readthedocs.io) is only used in the `calc()` -function, not in the python script, consequently it is not necessary to call the script with `mpiexec` but instead -a call with the regular python interpreter is sufficient. This highlights how `executorlib` allows the users to -parallelize one function at a time and not having to convert their whole workflow to use [mpi4py](https://mpi4py.readthedocs.io). -The same code can also be executed inside a jupyter notebook directly which enables an interactive development process. - -The interface of the standard [concurrent.futures.Executor](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures) -is extended by adding the option `cores_per_worker=2` to assign multiple MPI ranks to each function call. To create two -workers the maximum number of cores can be increased to `max_cores=4`. In this case each worker receives two cores -resulting in a total of four CPU cores being utilized. - -After submitting the function `calc()` with the corresponding parameter to the executor `exe.submit(calc, 0)` -a python [`concurrent.futures.Future`](https://docs.python.org/3/library/concurrent.futures.html#future-objects) is -returned. Consequently, the `executorlib.Executor` can be used as a drop-in replacement for the -[`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures) -which allows the user to add parallelism to their workflow one function at a time. - -## Disclaimer -While we try to develop a stable and reliable software library, the development remains a opensource project under the -BSD 3-Clause License without any warranties:: + +with Executor(backend="local") as exe: + fs = exe.submit(calc, 3, resource_dict={"cores": 2}) + print(fs.result()) ``` -BSD 3-Clause License - -Copyright (c) 2022, Jan Janssen -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -* Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - -* Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -* Neither the name of the copyright holder nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +The additional `resource_dict` parameter defines the computing resources allocated to the execution of the submitted +Python function. In addition to the compute cores `cores`, the resource dictionary can also define the threads per core +as `threads_per_core`, the GPUs per core as `gpus_per_core`, the working directory with `cwd`, the option to use the +OpenMPI oversubscribe feature with `openmpi_oversubscribe` and finally for the [Simple Linux Utility for Resource +Management (SLURM)](https://slurm.schedmd.com) queuing system the option to provide additional command line arguments +with the `slurm_cmd_args` parameter - [resource dictionary](). + +This flexibility to assign computing resources on a per-function-call basis simplifies the up-scaling of Python programs. +Only the part of the Python functions which benefit from parallel execution are implemented as MPI parallel Python +funtions, while the rest of the program remains serial. + +The same function can be submitted to the [SLURM](https://slurm.schedmd.com) queuing by just changing the `backend` +parameter to `slurm_submission`. The rest of the example remains the same, which highlights how executorlib accelerates +the rapid prototyping and up-scaling of HPC Python programs. +```python +from executorlib import Executor + + +def calc(i): + from mpi4py import MPI + + size = MPI.COMM_WORLD.Get_size() + rank = MPI.COMM_WORLD.Get_rank() + return i, size, rank + + +with Executor(backend="slurm_submission") as exe: + fs = exe.submit(calc, 3, resource_dict={"cores": 2}) + print(fs.result()) ``` +In this case the [Python simple queuing system adapter (pysqa)](https://pysqa.readthedocs.io) is used to submit the +`calc()` function to the [SLURM](https://slurm.schedmd.com) job scheduler and request an allocation with two CPU cores +for the execution of the function - [HPC Submission Mode](). In the background the [sbatch](https://slurm.schedmd.com/sbatch.html) +command is used to request the allocation to execute the Python function. + +Within a given [SLURM](https://slurm.schedmd.com) allocation executorlib can also be used to assign a subset of the +available computing resources to execute a given Python function. In terms of the [SLURM](https://slurm.schedmd.com) +commands, this functionality internally uses the [srun](https://slurm.schedmd.com/srun.html) command to receive a subset +of the resources of a given queuing system allocation. +```python +from executorlib import Executor + -# Documentation +def calc(i): + from mpi4py import MPI + + size = MPI.COMM_WORLD.Get_size() + rank = MPI.COMM_WORLD.Get_rank() + return i, size, rank + + +with Executor(backend="slurm_allocation") as exe: + fs = exe.submit(calc, 3, resource_dict={"cores": 2}) + print(fs.result()) +``` +In addition, to support for [SLURM](https://slurm.schedmd.com) executorlib also provides support for the hierarchical +[flux](http://flux-framework.org) job scheduler. The [flux](http://flux-framework.org) job scheduler is developed at +[Larwence Livermore National Laboratory](https://computing.llnl.gov/projects/flux-building-framework-resource-management) +to address the needs for the up-coming generation of Exascale computers. Still even on traditional HPC clusters the +hierarchical approach of the [flux](http://flux-framework.org) is beneficial to distribute hundreds of tasks within a +given allocation. Even when [SLURM](https://slurm.schedmd.com) is used as primary job scheduler of your HPC, it is +recommended to use [SLURM with flux]() as hierarchical job scheduler within the allocations. + +## Documentation * [Installation](https://executorlib.readthedocs.io/en/latest/installation.html) * [Compatible Job Schedulers](https://executorlib.readthedocs.io/en/latest/installation.html#compatible-job-schedulers) * [executorlib with Flux Framework](https://executorlib.readthedocs.io/en/latest/installation.html#executorlib-with-flux-framework) diff --git a/binder/environment.yml b/binder/environment.yml index fdcaa193..a598ce5e 100644 --- a/binder/environment.yml +++ b/binder/environment.yml @@ -11,3 +11,8 @@ dependencies: - flux-pmix =0.5.0 - versioneer =0.28 - h5py =3.12.1 +- matplotlib =3.9.2 +- networkx =3.4.2 +- pygraphviz =1.14 +- pysqa =0.2.2 +- ipython =8.29.0 diff --git a/docs/_toc.yml b/docs/_toc.yml index 541e17d6..ba8e12b3 100644 --- a/docs/_toc.yml +++ b/docs/_toc.yml @@ -2,7 +2,9 @@ format: jb-book root: README chapters: - file: installation.md -- file: examples.ipynb -- file: development.md +- file: 1-local.ipynb +- file: 2-hpc-submission.ipynb +- file: 3-hpc-allocation.ipynb - file: trouble_shooting.md +- file: 4-developer.ipynb - file: api.rst diff --git a/docs/installation.md b/docs/installation.md index 4d53ce36..8e25616c 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -1,16 +1,78 @@ # Installation -## Compatible Job Schedulers -For optimal performance the [flux framework](https://flux-framework.org) is recommended as job scheduler. Even when the -[Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com) or any other job scheduler is already -installed on the HPC cluster [flux framework](https://flux-framework.org) can be installed as a secondary job scheduler -to leverage [flux framework](https://flux-framework.org) for the distribution of resources within a given allocation of -the primary scheduler. +## Minimal +Executorlib internally uses the [zero message queue (zmq)](https://zeromq.org) for communication between the Python +processes and [cloudpickle](https://github.com/cloudpipe/cloudpickle) for serialization of Python functions to communicate +them from one process to another. So for a minimal installation of executorlib only these two dependencies are installed: +``` +pip install executorlib +``` +Alternative to the [Python package manager](https://pypi.org/project/executorlib/), executorlib can also be installed +via the [conda package manager](https://anaconda.org/conda-forge/executorlib): +``` +conda install -c conda-forge executorlib +``` +A number of features are not available in this minimalistic installation of executorlib, these include the execution of +MPI parallel Python funtions, which requires the [mpi4py](https://mpi4py.readthedocs.io) package, the caching based on +the hierarchical data format (HDF5), which requires the [h5py](https://www.h5py.org) package, the submission to job +schedulers, which requires the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io) and the +visualisation of dependencies, which requires a number of visualisation packages. -Alternatively, `executorlib` can directly create job steps in a SLURM allocation using the `srun `command. Still this -always queries the central database of the SLURM job scheduler which can decrease the performance of the job scheduler -and is not recommended. +## MPI support +The submission of MPI parallel Python functions requires the installation of the [mpi4py](https://mpi4py.readthedocs.io) +package. This can be installed in combination with executorlib using either the [Python package manager](https://pypi.org/project/mpi4py/): +``` +pip install executorlib[mpi] +``` +Or alternatively using the [conda package manager](https://anaconda.org/conda-forge/mpi4py): +``` +conda install -c conda-forge executorlib mpi4py +``` +Given the C++ bindings included in the [mpi4py](https://mpi4py.readthedocs.io) package it is recommended to use a binary +distribution of [mpi4py](https://mpi4py.readthedocs.io) and only compile it manually when a specific version of MPI is +used. The mpi4py documentation covers the [installation of mpi4py](https://mpi4py.readthedocs.io/en/stable/install.html) +in more detail. + +## Caching +While the caching is an optional feature for [Local Mode] and for the distribution of Python functions in a given +allocation of an HPC job scheduler [HPC Allocation Mode], it is required for the submission of individual functions to +an HPC job scheduler [HPC Submission Mode]. This is required as in [HPC Submission Mode] the Python function is stored +on the file system until the requested computing resources become available. The caching is implemented based on the +hierarchical data format (HDF5). The corresponding [h5py](https://www.h5py.org) package can be installed using either +the [Python package manager](https://pypi.org/project/h5py/): +``` +pip install executorlib[cache] +``` +Or alternatively using the [conda package manager](https://anaconda.org/conda-forge/h5py): +``` +conda install -c conda-forge executorlib h5py +``` +Again, given the C++ bindings of the [h5py](https://www.h5py.org) package to the HDF5 format, a binary distribution is +recommended. The h5py documentation covers the [installation of h5py](https://docs.h5py.org/en/latest/build.html) in +more detail. + +## HPC Submission Mode +[HPC Submission Mode] requires the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io) to +interface with the job schedulers and [h5py](https://www.h5py.org) package to enable caching, as explained above. Both +can be installed via the [Python package manager](https://pypi.org/project/pysqa/): +``` +pip install executorlib[submission] +``` +Or alternatively using the [conda package manager](https://anaconda.org/conda-forge/pysqa): +``` +conda install -c conda-forge executorlib h5py pysqa +``` +Depending on the choice of job scheduler the [pysqa](https://pysqa.readthedocs.io) package might require additional +dependencies, still at least for [SLURM](https://slurm.schedmd.com) no additional requirements are needed. The pysqa +documentation covers the [installation of pysqa](https://pysqa.readthedocs.io/en/latest/installation.html) in more +detail. + +## HPC Allocation Mode +For optimal performance in [HPC Allocation Mode] the [flux framework](https://flux-framework.org) is recommended as job +scheduler. Even when the [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com) or any other +job scheduler is already installed on the HPC cluster. [flux framework](https://flux-framework.org) can be installed as +a secondary job scheduler to leverage [flux framework](https://flux-framework.org) for the distribution of resources +within a given allocation of the primary scheduler. -## executorlib with Flux Framework The [flux framework](https://flux-framework.org) uses `libhwloc` and `pmi` to understand the hardware it is running on and to booststrap MPI. `libhwloc` not only assigns CPU cores but also GPUs. This requires `libhwloc` to be compiled with support for GPUs from your vendor. In the same way the version of `pmi` for your queuing system has to be compatible with the version @@ -57,9 +119,10 @@ For the version 5 of openmpi the backend changed to `pmix`, this requires the ad ``` conda install -c conda-forge flux-core flux-sched flux-pmix openmpi>=5 executorlib ``` -In addition, the `pmi="pmix"` parameter has to be set for the `executorlib.Executor` to switch to `pmix` as backend. +In addition, the `flux_executor_pmi_mode="pmix"` parameter has to be set for the `executorlib.Executor` to switch to +`pmix` as backend. -## Test Flux Framework +### Test Flux Framework To validate the installation of flux and confirm the GPUs are correctly recognized, you can start a flux session on the login node using: ``` @@ -98,15 +161,71 @@ startup process of flux using: srun –mpi=pmi2 flux start python ``` -## Without Flux Framework -It is possible to install `executorlib` without flux, for example for using it on a local workstation or in combination -with the [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com). While this is not recommended -in the high performance computing (HPC) context as `executorlib` with `block_allocation=False` is going to create a SLURM -job step for each submitted python function. +### Flux with Jupyter +To options are available to use flux inside the jupyter notebook or jupyter lab environment. The first option is to +start the flux session and then start the jupyter notebook inside the flux session. This just requires a single call on +the command line: +``` +flux start jupyter notebook +``` +The second option is to create a separate Jupyter kernel for flux. This option requires multiple steps of configuration, +still it has the advantage that it is also compatible with the multi-user jupyterhub environment. Start by identifying +the directory Jupyter searches for Jupyter kernels: +``` +jupyter kernelspec list +``` +This returns a list of jupyter kernels, commonly stored in `~/.local/share/jupyter`. It is recommended to create the +flux kernel in this directory. Start by creating the corresponding directory by copying one of the existing kernels: +``` +cp -r ~/.local/share/jupyter/kernels/python3 ~/.local/share/jupyter/kernels/flux +``` +In the directory a JSON file is created which contains the configuration of the Jupyter Kernel. You can use an editor of +your choice, here we use vi to create the `kernel.json` file: +``` +vi ~/.local/share/jupyter/kernels/flux/kernel.json +``` +Inside the file copy the following content. The first entry under the name `argv` provides the command to start the +jupyter kernel. Typically this would be just calling python with the parameters to launch an ipykernel. In front of this +command the `flux start` command is added. +``` +{ + "argv": [ + "flux", + "start", + "/srv/conda/envs/notebook/bin/python", + "-m", + "ipykernel_launcher", + "-f", + "{connection_file}" + ], + "display_name": "Flux", + "language": "python", + "metadata": { + "debugger": true + } +} +``` +More details for the configuration of Jupyter kernels is available as part of the [Jupyter documentation](https://jupyter-client.readthedocs.io/en/latest/kernels.html#kernel-specs). -In this case `executorlib` can be installed using: +## Visualisation +The visualisation of the dependency graph with the `plot_dependency_graph` parameter requires [pygraphviz](https://pygraphviz.github.io/documentation/stable/). +This can installed via the [Python package manager](https://pypi.org/project/pygraphviz/): ``` -conda install -c conda-forge executorlib +pip install executorlib[graph] +``` +Or alternatively using the [conda package manager](https://anaconda.org/conda-forge/pygraphviz): ``` +conda install -c conda-forge executorlib pygraphviz matplotlib networkx ipython +``` +Again given the C++ bindings of [pygraphviz](https://pygraphviz.github.io/documentation/stable/) to the graphviz library +it is recommended to install a binary distribution. The pygraphviz documentation covers the [installation of pygraphviz](https://pygraphviz.github.io/documentation/stable/install.html) +in more detail. Furthermore, [matplotlib](https://matplotlib.org), [networkx](https://networkx.org) and [ipython](https://ipython.readthedocs.io) +are installed as additional requirements for the visualisation. -This also includes workstation installations on Windows and MacOS. +## For Developers +To install a specific development branch of executorlib you use the [Python package manager](https://pypi.org/project/executorlib/) +and directly install from the Github repository executorlib is hosted on: +``` +pip install git+https://github.com/pyiron/executorlib.git@main +``` +In this example the `main` branch is selected. To select a different branch just replace `main` with your target branch. \ No newline at end of file diff --git a/docs/trouble_shooting.md b/docs/trouble_shooting.md index 8d79e140..b82cde29 100644 --- a/docs/trouble_shooting.md +++ b/docs/trouble_shooting.md @@ -1,103 +1,57 @@ -# Trouble shooting - -## When `flux` fails: - -### Step-by-Step Guide to Create a Custom Jupyter Kernel for Flux - -#### Step 1: Create a New Kernel Specification - -1. Install [`flux-core`](https://anaconda.org/conda-forge/flux-core) in your Jupyter environment: - - ```bash - conda install -c conda-forge flux-core - ``` - -2. **Find the Jupyter Kernel Directory**: - - Open your terminal or command prompt and run: - - ```bash - jupyter --paths - ``` - - This command will display the paths where Jupyter looks for kernels. You'll usually find a directory named `kernels` under the `jupyter` data directory. You will create a new directory for the Flux kernel in the `kernels` directory. - -3. **Create the Kernel Directory**: - - Navigate to the kernels directory (e.g., `~/.local/share/jupyter/kernels` on Linux or macOS) and create a new directory called `flux`. - - ```bash - mkdir -p ~/.local/share/jupyter/kernels/flux - ``` - - If you're using Windows, the path will be different, such as `C:\Users\\AppData\Roaming\jupyter\kernels`. - -4. **Create the `kernel.json` File**: - - Inside the new `flux` directory, create a file named `kernel.json`: - - ```bash - nano ~/.local/share/jupyter/kernels/flux/kernel.json - ``` - - Paste the following content into the file: - - ```json - { - "argv": [ - "flux", - "start", - "/srv/conda/envs/notebook/bin/python", - "-m", - "ipykernel_launcher", - "-f", - "{connection_file}" - ], - "display_name": "Flux", - "language": "python", - "metadata": { - "debugger": true - } - } - ``` - - - **`argv`**: This array specifies the command to start the Jupyter kernel. It uses `flux start` to launch Python in the Flux environment. - - **`display_name`**: The name displayed in Jupyter when selecting the kernel. - - **`language`**: The programming language (`python`). - - **Note**: - - - Make sure to replace `"/srv/conda/envs/notebook/bin/python"` with the correct path to your Python executable. You can find this by running `which python` or `where python` in your terminal. - - If you installed `flux` in a specific environment, you have to write the absolute path to `flux` in the `argv` array. - -#### Step 2: Restart Jupyter Notebook - -1. **Restart the Jupyter Notebook Server**: - - Close the current Jupyter Notebook server and restart it: - - ```bash - jupyter notebook - ``` - - ```bash - jupyter lab - ``` - - Or simply restart your server. - -2. **Select the Flux Kernel**: - - When creating a new notebook or changing the kernel of an existing one, you should see an option for "Flux" in the list of available kernels. Select it to run your code with the Flux environment. - -#### Step 3: Run Your Code with `FluxExecutor` - -Now, your Jupyter environment is set up to use `flux-core`. You can run your code like this: - -```python -import flux.job - -# Use FluxExecutor within the Flux kernel -with flux.job.FluxExecutor() as flux_exe: - print("FluxExecutor is running within the Jupyter Notebook") -``` +# Trouble Shooting +Some of the most frequent issues are covered below, for everything else do not be shy and [open an issue on Github](https://github.com/pyiron/executorlib/issues). + +## Filesystem Usage +The cache of executorlib is not removed after the Python process completed. So it is the responsibility of the user to +clean up the cache directory they created. This can be easily forgot, so it is important to check for remaining cache +directories from time to time and remove them. + +## Firewall Issues +MacOS comes with a rather strict firewall, which does not allow to connect to an MacOS computer using the hostname even +if it is the hostname of the current computer. MacOS only supports connections based on the hostname `localhost`. To use +`localhost` rather than the hostname to connect to the Python processes executorlib uses for the execution of the Python +function, executorlib provides the option to set `hostname_localhost=True`. For MacOS this option is enabled by default, +still if other operating systems implement similar strict firewall rules, the option can also be set manually to enabled +local mode on computers with strict firewall rules. + +## Message Passing Interface +To use the message passing interface (MPI) executorlib requires [mpi4py](https://mpi4py.readthedocs.io/) as optional +dependency. The installation of this and other optional dependencies is covered in the [installation section](). + +## Missing Dependencies +The default installation of executorlib only comes with a limited number of dependencies, especially the [zero message queue](https://zeromq.org) +and [cloudpickle](https://github.com/cloudpipe/cloudpickle). Additional features like [caching](), [HPC submission mode]() +and [HPC allocation mode]() require additional dependencies. The dependencies are explained in more detail in the +[installation section](). + +## Python Version +Executorlib supports all current Python version ranging from 3.9 to 3.13. Still some of the dependencies and especially +the [flux](http://flux-framework.org) job scheduler are currently limited to Python 3.12 and below. Consequently for high +performance computing installations Python 3.12 is the recommended Python verion. + +## Resource Dictionary +The resource dictionary parameter `resource_dict` can contain one or more of the following options: +* `cores_per_worker` (int): number of MPI cores to be used for each function call +* `threads_per_core` (int): number of OpenMP threads to be used for each function call +* `gpus_per_worker` (int): number of GPUs per worker - defaults to 0 +* `cwd` (str/None): current working directory where the parallel python task is executed +* `openmpi_oversubscribe` (bool): adds the `--oversubscribe` command line flag (OpenMPI and SLURM only) - default False +* `slurm_cmd_args` (list): Additional command line arguments for the srun call (SLURM only) + +For the special case of the [HPC allocation mode]() the resource dictionary parameter `resource_dict` can also include +additional parameters define in the submission script of the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io) +these include but are not limited to: +* `run_time_max` (int): the maximum time the execution of the submitted Python function is allowed to take in seconds. +* `memory_max` (int): the maximum amount of memory the Python function is allowed to use in Gigabytes. +* `partition` (str): the partition of the queuing system the Python function is submitted to. +* `queue` (str): the name of the queue the Python function is submitted to. + +All parameters in the resource dictionary `resource_dict` are optional. + +## SSH Connection +While the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io) provides the option to connect to +high performance computing (HPC) clusters via SSH, this functionality is not supported for executorlib. The background +is the use of [cloudpickle](https://github.com/cloudpipe/cloudpickle) for serialization inside executorlib, this requires +the same Python version and dependencies on both computer connected via SSH. As tracking those parameters is rather +complicated the SSH connection functionality of [pysqa](https://pysqa.readthedocs.io) is not officially supported in +executorlib. \ No newline at end of file diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 16358677..5e0a1c94 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -1,7 +1,10 @@ from typing import Optional from executorlib._version import get_versions as _get_versions -from executorlib.interactive.executor import ExecutorWithDependencies, create_executor +from executorlib.interactive.executor import ( + ExecutorWithDependencies as _ExecutorWithDependencies, +) +from executorlib.interactive.executor import create_executor as _create_executor from executorlib.standalone.inputcheck import ( check_plot_dependency_graph as _check_plot_dependency_graph, ) @@ -199,7 +202,7 @@ def __new__( ) elif not disable_dependencies: _check_pysqa_config_directory(pysqa_config_directory=pysqa_config_directory) - return ExecutorWithDependencies( + return _ExecutorWithDependencies( max_workers=max_workers, backend=backend, cache_directory=cache_directory, @@ -218,7 +221,7 @@ def __new__( _check_pysqa_config_directory(pysqa_config_directory=pysqa_config_directory) _check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) _check_refresh_rate(refresh_rate=refresh_rate) - return create_executor( + return _create_executor( max_workers=max_workers, backend=backend, cache_directory=cache_directory, diff --git a/notebooks/1-local.ipynb b/notebooks/1-local.ipynb new file mode 100644 index 00000000..e656974d --- /dev/null +++ b/notebooks/1-local.ipynb @@ -0,0 +1,855 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "6915218e-7cf3-4bf4-9618-7e6942b4762f", + "metadata": {}, + "source": [ + "# Local Mode\n", + "The local mode in executorlib, which is selected by setting the `backend` parameter to `\"local\"`, is primarily used to enable rapid prototyping on a workstation computer to test your parallel Python program with executorlib before transferring it to an high performance computer (HPC). When the `backend` parameter is not set, it defaults to `\"local\"`. With the added capability of executorlib it is typically 10% slower than the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) from the Python standard library on a single node, when all acceleration features are enabled. This overhead is primarily related to the creation of new tasks. So the performance of executorlib improves when the individual Python function calls require extensive computations. \n", + "\n", + "An advantage that executorlib has over the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) and the [ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) from the Python standard libary, is the use of [cloudpickle](https://github.com/cloudpipe/cloudpickle) as serialization backend to transfer Python functions between processes. This enables the use of dynamically defined Python functions for example in the case of a Jupyter notebook. " + ] + }, + { + "cell_type": "markdown", + "id": "ccc686dd-8fc5-4755-8a19-f40010ebb1b8", + "metadata": {}, + "source": [ + "## Basic Functionality\n", + "The general functionality of executorlib follows the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) of the Python standard library. You can import the `Executor` class directly from executorlib and then just replace the [ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) or [ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) with the `Executor` class to start using executorlib. " + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "b1907f12-7378-423b-9b83-1b65fc0a20f5", + "metadata": {}, + "outputs": [], + "source": [ + "from executorlib import Executor" + ] + }, + { + "cell_type": "markdown", + "id": "1654679f-38b3-4699-9bfe-b48cbde0b2db", + "metadata": {}, + "source": [ + "It is recommended to use the `Executor` class in combination with a `with`-statement. This gurantees the processes created by the `Executor` class to evaluate the Python functions are afterward closed and do not remain ghost processes. A function is then submitted using the `submit(fn, /, *args, **kwargs)` function which executes a given function `fn` as `fn(*args, **kwargs)`. The `submit()` function returns a [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object, as defined by the Python Standard Library. As a first example we submit the function `sum()` to calculate the sum of the list `[1, 1]`:" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "16f7d138-ed77-45ea-a554-d329f7237500", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2\n", + "CPU times: user 100 ms, sys: 70.7 ms, total: 171 ms\n", + "Wall time: 1.94 s\n" + ] + } + ], + "source": [ + "%%time\n", + "with Executor(backend=\"local\") as exe:\n", + " future = exe.submit(sum, [1, 1])\n", + " print(future.result())" + ] + }, + { + "cell_type": "markdown", + "id": "a1109584-9db2-4f9d-b3ed-494d96241396", + "metadata": {}, + "source": [ + "As expected the result of the summation `sum([1, 1])` is `2`. The same result is retrieved from the [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object received from the submission of the `sum()` as it is printed here `print(future.result())`. For most Python functions and especially the `sum()` function it is computationally not efficient to initialize the `Executor` class only for the execution of a single function call, rather it is more computationally efficient to initialize the `Executor` class once and then submit a number of functions. This can be achieved with a loop. For example the sum of the pairs `[2, 2]`, `[3, 3]` and `[4, 4]` can be achieved with a for-loop inside the context of the `Executor()` class as provided by the `with`-statement. " + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "cfccdf9a-b23b-4814-8c14-36703a8a5f9e", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[4, 6, 8]\n", + "CPU times: user 49.4 ms, sys: 29.2 ms, total: 78.7 ms\n", + "Wall time: 1.75 s\n" + ] + } + ], + "source": [ + "%%time\n", + "with Executor(backend=\"local\") as exe:\n", + " future_lst = [exe.submit(sum, [i, i]) for i in range(2, 5)]\n", + " print([f.result() for f in future_lst])" + ] + }, + { + "cell_type": "markdown", + "id": "7db58f70-8137-4f1c-a87b-0d282f2bc3c5", + "metadata": {}, + "source": [ + "If only the parameters change but the function, which is applied to these parameters, remains the same, like in the case above the `sum()` function is applied to three pairs of parameters, then the `map(fn, *iterables, timeout=None, chunksize=1)` function can be used to map the function to the different sets of parameters - as it is defined in the [Python standard library](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map). " + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "abd0beb7-471d-490e-bb9c-96755bd7aacf", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[10, 12, 14]\n", + "CPU times: user 40.5 ms, sys: 28.1 ms, total: 68.6 ms\n", + "Wall time: 1.09 s\n" + ] + } + ], + "source": [ + "%%time\n", + "with Executor(backend=\"local\") as exe:\n", + " results = exe.map(sum, [[5, 5], [6, 6], [7, 7]])\n", + " print(list(results))" + ] + }, + { + "cell_type": "markdown", + "id": "ac86bf47-4eb6-4d7c-acae-760b880803a8", + "metadata": {}, + "source": [ + "These three examples cover the general functionality of the `Executor` class. Following the [Executor](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) interface as it is defined in the Python standard library. " + ] + }, + { + "cell_type": "markdown", + "id": "5de0f0f2-bf5c-46b3-8171-a3a206ce6775", + "metadata": {}, + "source": [ + "## Parallel Functions\n", + "Writing parallel software is not trivial. So rather than writing the whole Python program in a parallel way, executorlib allows developers to implement parallel execution on a function by function level. In this way individual functions can be replaced by parallel functions as needed without the need to modify the rest of the program. With the Local Mode executorlib supports two levels of parallel execution, parallel execution based on the Message Passing Interface (MPI) using the [mpi4py](https://mpi4py.readthedocs.io) package, or thread based parallel execution. Both levels of parallelism can be defined inside the function and do not require any modifications to the rest of the Python program. " + ] + }, + { + "cell_type": "markdown", + "id": "dc8e692f-bf6c-4838-bb82-6a6b8454a2e7", + "metadata": {}, + "source": [ + "### MPI Parallel Functions\n", + "MPI is the default way to develop parallel programs for HPCs. Still it can be challenging to refactor a previously serial program to efficiently use MPI to achieve optimal computational efficiency for parallel execution, even with libraries like [mpi4py](https://mpi4py.readthedocs.io). To simplify the up-scaling of Python programs executorlib provides the option to use MPI parallel Python code inside a given Python function and then submit this parallel Python function to an `Executor` for evaluation. \n", + "\n", + "The following `calc_mpi()` function imports the [mpi4py](https://mpi4py.readthedocs.io) package and then uses the internal functionality of MPI to get the total number of parallel CPU cores in the current MPI group `MPI.COMM_WORLD.Get_size()` and the index of the current processor in the MPI group `MPI.COMM_WORLD.Get_rank()`.\n", + "\n", + "The [mpi4py](https://mpi4py.readthedocs.io) package is an optional dependency of executorlib. The installation of the [mpi4py](https://mpi4py.readthedocs.io) package is covered in the installation section." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "a251d083-489e-41c1-9e49-c86093858006", + "metadata": {}, + "outputs": [], + "source": [ + "def calc_mpi(i):\n", + " from mpi4py import MPI\n", + "\n", + " size = MPI.COMM_WORLD.Get_size()\n", + " rank = MPI.COMM_WORLD.Get_rank()\n", + " return i, size, rank" + ] + }, + { + "cell_type": "markdown", + "id": "adbf8a10-04e1-4fd9-8768-4375bcba9ec3", + "metadata": {}, + "source": [ + "The computational resources for the execution of the `calc_mpi()` Python function are defined using the resource dictionary parameter `resource_dict={}`. The reseource dictionary can either be provided as additional parameter for the `submit()` function. It is important that the parameter name `resource_dict` is reserved exclusively for the `submit()` function and cannot be used in the function which is submitted, like the `calc_mpi()` function in this example:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "266864f1-d29e-4934-9b5d-51f4ffb11f5c", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[(3, 2, 0), (3, 2, 1)]\n" + ] + } + ], + "source": [ + "with Executor(backend=\"local\") as exe:\n", + " fs = exe.submit(calc_mpi, 3, resource_dict={\"cores\": 2})\n", + " print(fs.result())" + ] + }, + { + "cell_type": "markdown", + "id": "3a449e3f-d7a4-4056-a1d0-35dfca4dad22", + "metadata": {}, + "source": [ + "Another option is to set the resource dictionary parameter `resource_dict` during the initialization of the `Executor`. In this case it is internally set for every call of the `submit()` function, without the need to specify it again." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "cb4ad978-bdf2-47bb-a7df-846641a54ec2", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[(3, 2, 0), (3, 2, 1)]\n" + ] + } + ], + "source": [ + "with Executor(backend=\"local\", resource_dict={\"cores\": 2}) as exe:\n", + " fs = exe.submit(calc_mpi, 3)\n", + " print(fs.result())" + ] + }, + { + "cell_type": "markdown", + "id": "c1d1d7b1-64fa-4e47-bbde-2a16036568d6", + "metadata": {}, + "source": [ + "In addition, to the compute cores `cores`, the resource dictionary parameter `resource_dict` can also define the threads per core as `threads_per_core`, the GPUs per core as `gpus_per_core`, the working directory with `cwd`, the option to use the OpenMPI oversubscribe feature with `openmpi_oversubscribe` and finally for the [Simple Linux Utility for Resource \n", + "Management (SLURM)](https://slurm.schedmd.com) queuing system the option to provide additional command line arguments with the `slurm_cmd_args` parameter - [resource dictionary]()." + ] + }, + { + "cell_type": "markdown", + "id": "4f5c5221-d99c-4614-82b1-9d6d3260c1bf", + "metadata": {}, + "source": [ + "### Thread Parallel Functions\n", + "An alternative option of parallelism is [thread based parallelism](https://docs.python.org/3/library/threading.html). executorlib supports thread based parallelism with the `threads_per_core` parameter in the resource dictionary `resource_dict`. Given the [global interpreter lock](https://docs.python.org/3/glossary.html#term-global-interpreter-lock) in the cPython implementation a common application of thread based parallelism in Python is using additional threads in linked libraries. The number of threads is commonly controlled with environment variables like `OMP_NUM_THREADS`, `OPENBLAS_NUM_THREADS`, `MKL_NUM_THREADS`, `VECLIB_MAXIMUM_THREADS` and `NUMEXPR_NUM_THREADS`. Specific libraries might require other environment variables. The environment variables can be set using the environment interface of the Python standard library `os.environ`." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "7a7d21f6-9f1a-4f30-8024-9993e156dc75", + "metadata": {}, + "outputs": [], + "source": [ + "def calc_with_threads(i):\n", + " import os\n", + "\n", + " os.environ[\"OMP_NUM_THREADS\"] = \"2\"\n", + " os.environ[\"OPENBLAS_NUM_THREADS\"] = \"2\"\n", + " os.environ[\"MKL_NUM_THREADS\"] = \"2\"\n", + " os.environ[\"VECLIB_MAXIMUM_THREADS\"] = \"2\"\n", + " os.environ[\"NUMEXPR_NUM_THREADS\"] = \"2\"\n", + " import numpy as np\n", + "\n", + " return i" + ] + }, + { + "cell_type": "markdown", + "id": "82ed8f46-836c-402e-9363-be6e16c2a0b0", + "metadata": {}, + "source": [ + "Again the resource dictionary parameter `resource_dict` can be set either in the `submit()` function:" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "b8ed330d-ee77-44a0-a02f-670fa945b043", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "3\n" + ] + } + ], + "source": [ + "with Executor(backend=\"local\") as exe:\n", + " fs = exe.submit(calc_with_threads, 3, resource_dict={\"threads_per_core\": 2})\n", + " print(fs.result())" + ] + }, + { + "cell_type": "markdown", + "id": "63222cd5-664b-4aba-a80c-5814166b1239", + "metadata": {}, + "source": [ + "Or alternatively, the resource dictionary parameter `resource_dict` can also be set during the initialization of the `Executor` class:" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "31562f89-c01c-4e7a-bbdd-fa26ca99e68b", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "3\n" + ] + } + ], + "source": [ + "with Executor(backend=\"local\", resource_dict={\"threads_per_core\": 2}) as exe:\n", + " fs = exe.submit(calc_with_threads, 3)\n", + " print(fs.result())" + ] + }, + { + "cell_type": "markdown", + "id": "8b78a7b4-066e-4cbc-858e-606c8bbbbf0c", + "metadata": {}, + "source": [ + "For most cases MPI based parallelism leads to higher computational efficiency in comparison to thread based parallelism, still the choice of parallelism depends on the specific Python function which should be executed in parallel. Careful benchmarks are required to achieve the optimal performance for a given computational architecture. \n", + "\n", + "Beyond MPI based parallelism and thread based parallelism the [HPC Submission Mode]() and the [HPC Allocation Mode]() also provide the option to assign GPUs to the execution of individual Python functions. " + ] + }, + { + "cell_type": "markdown", + "id": "ca9bc450-2762-4d49-b7f8-48cc83e068fd", + "metadata": {}, + "source": [ + "## Performance Optimization\n", + "The default settings of executorlib are chosen to favour stability over performance. Consequently, the performance of executorlib can be improved by setting additional parameters. It is commonly recommended to start with an initial implementation based on executorlib and then improve the performance by enabling specialized features." + ] + }, + { + "cell_type": "markdown", + "id": "e9b52ecf-3984-4695-98e7-315aa3712104", + "metadata": {}, + "source": [ + "### Block Allocation\n", + "By default each submitted Python function is executed in a dedicated process. This gurantees that the execution of the submitted Python function starts with a fresh process. Still the initialization of the Python process takes time. Especially when the call of the Python function requires only limited computational resources it makes sense to reuse the existing Python process for the execution of multiple Python functions. In executorlib this functionality is enabled by setting the `block_allocation` parameter to `Ture`. To limit the number of parallel Python processes when using block allocation it is recommended to set the `max_workers` parameter to restrict the number of available computing resources. " + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "0da4c7d0-2268-4ea8-b62d-5d94c79ebc72", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2\n", + "CPU times: user 37.1 ms, sys: 21.8 ms, total: 58.9 ms\n", + "Wall time: 1.09 s\n" + ] + } + ], + "source": [ + "%%time\n", + "with Executor(max_workers=2, backend=\"local\", block_allocation=True) as exe:\n", + " future = exe.submit(sum, [1, 1])\n", + " print(future.result())" + ] + }, + { + "cell_type": "markdown", + "id": "d38163b3-1c04-431c-964b-2bad4f823a4d", + "metadata": {}, + "source": [ + "The same functionality also applies to MPI parallel Python functions. The important part is that while it is possible to assign more than one Python process to the execution of a Python function in block allocation mode, it is not possible to assign resources during the submission of the function with the `submit()` function. Starting again with the `calc_mpi()` function: " + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "cb8c4943-4c78-4203-95f2-1db758e588d9", + "metadata": {}, + "outputs": [], + "source": [ + "def calc_mpi(i):\n", + " from mpi4py import MPI\n", + "\n", + " size = MPI.COMM_WORLD.Get_size()\n", + " rank = MPI.COMM_WORLD.Get_rank()\n", + " return i, size, rank" + ] + }, + { + "cell_type": "markdown", + "id": "9e1212c4-e3fb-4e21-be43-0a4f0a08b856", + "metadata": {}, + "source": [ + "Still the resource dictionary parameter can still be set during the initialisation of the `Executor` class. Internally, this groups the created Python processes in fixed allocations and afterwards submit Python functions to these allocations." + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "5ebf7195-58f9-40f2-8203-2d4b9f0e9602", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[(3, 2, 0), (3, 2, 1)]\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "--------------------------------------------------------------------------\n", + "A system call failed during shared memory initialization that should\n", + "not have. It is likely that your MPI job will now either abort or\n", + "experience performance degradation.\n", + "\n", + " Local host: MacBook-Pro.local\n", + " System call: unlink(2) /var/folders/z7/3vhrmssx60v240x_ndq448h80000gn/T//ompi.MacBook-Pro.501/pid.22031/1/vader_segment.MacBook-Pro.501.17620001.1\n", + " Error: No such file or directory (errno 2)\n", + "--------------------------------------------------------------------------\n", + "--------------------------------------------------------------------------\n", + "A system call failed during shared memory initialization that should\n", + "not have. It is likely that your MPI job will now either abort or\n", + "experience performance degradation.\n", + "\n", + " Local host: MacBook-Pro.local\n", + " System call: unlink(2) /var/folders/z7/3vhrmssx60v240x_ndq448h80000gn/T//ompi.MacBook-Pro.501/pid.22028/1/vader_segment.MacBook-Pro.501.17610001.1\n", + " Error: No such file or directory (errno 2)\n", + "--------------------------------------------------------------------------\n", + "--------------------------------------------------------------------------\n", + "A system call failed during shared memory initialization that should\n", + "not have. It is likely that your MPI job will now either abort or\n", + "experience performance degradation.\n", + "\n", + " Local host: MacBook-Pro.local\n", + " System call: unlink(2) /var/folders/z7/3vhrmssx60v240x_ndq448h80000gn/T//ompi.MacBook-Pro.501/pid.22030/1/vader_segment.MacBook-Pro.501.17630001.1\n", + " Error: No such file or directory (errno 2)\n", + "--------------------------------------------------------------------------\n", + "--------------------------------------------------------------------------\n", + "A system call failed during shared memory initialization that should\n", + "not have. It is likely that your MPI job will now either abort or\n", + "experience performance degradation.\n", + "\n", + " Local host: MacBook-Pro.local\n", + " System call: unlink(2) /var/folders/z7/3vhrmssx60v240x_ndq448h80000gn/T//ompi.MacBook-Pro.501/pid.22029/1/vader_segment.MacBook-Pro.501.17600001.1\n", + " Error: No such file or directory (errno 2)\n", + "--------------------------------------------------------------------------\n" + ] + } + ], + "source": [ + "with Executor(\n", + " backend=\"local\", resource_dict={\"cores\": 2}, block_allocation=True\n", + ") as exe:\n", + " fs = exe.submit(calc_mpi, 3)\n", + " print(fs.result())" + ] + }, + { + "cell_type": "markdown", + "id": "b75fb95f-f2f5-4be9-9f2a-9c2e9961c644", + "metadata": {}, + "source": [ + "The weakness of memory from a previous Python function remaining in the Python process can at the same time be an advantage for working with large datasets. In executorlib this is achieved by introducing the `init_function` parameter. The `init_function` returns a dictionary of parameters which can afterwards be reused as keyword arguments `**kwargs` in the functions submitted to the `Executor`. When block allocation `block_allocation` is disabled this functionality is not available, as each function is executed in a separate process, so no data can be preloaded." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "8aa754cc-eb1a-4fa1-bd72-272246df1d2f", + "metadata": {}, + "outputs": [], + "source": [ + "def init_function():\n", + " return {\"j\": 4, \"k\": 3, \"l\": 2}" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "1854895a-7239-4b30-b60d-cf1a89234464", + "metadata": {}, + "outputs": [], + "source": [ + "def calc_with_preload(i, j, k):\n", + " return i + j + k" + ] + }, + { + "cell_type": "markdown", + "id": "d07cf107-3627-4cb0-906c-647497d6e0d2", + "metadata": {}, + "source": [ + "The function `calc_with_preload()` requires three inputs `i`, `j` and `k`. But when the function is submitted to the executor only two inputs are provided `fs = exe.submit(calc, 2, j=5)`. In this case the first input parameter is mapped to `i=2`, the second input parameter is specified explicitly `j=5` but the third input parameter `k` is not provided. So the `Executor` automatically checks the keys set in the `init_function()` function. In this case the returned dictionary `{\"j\": 4, \"k\": 3, \"l\": 2}` defines `j=4`, `k=3` and `l=2`. For this specific call of the `calc_with_preload()` function, `i` and `j` are already provided so `j` is not required, but `k=3` is used from the `init_function()` and as the `calc_with_preload()` function does not define the `l` parameter this one is also ignored." + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "cc648799-a0c6-4878-a469-97457bce024f", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "10\n" + ] + } + ], + "source": [ + "with Executor(\n", + " backend=\"local\", init_function=init_function, block_allocation=True\n", + ") as exe:\n", + " fs = exe.submit(calc_with_preload, 2, j=5)\n", + " print(fs.result())" + ] + }, + { + "cell_type": "markdown", + "id": "1073b8ca-1492-46e9-8d1f-f52ad48d28a2", + "metadata": {}, + "source": [ + "The result is `2+5+3=10` as `i=2` and `j=5` are provided during the submission and `k=3` is defined in the `init_function()` function." + ] + }, + { + "cell_type": "markdown", + "id": "24397d78-dff1-4834-830c-a8f390fe6b9c", + "metadata": {}, + "source": [ + "### Cache\n", + "The development of scientific workflows is commonly an interactive process, extending the functionality step by step. This lead to the development of interactive environments like [Jupyter](https://jupyter.org) which is fully supported by executorlib. Still many of the computationally intensive Python functions can take in the order of minutes to hours or even longer to execute, so reusing an existing Python process is not feasible. To address this challenge executorlib provides a file based cache to store the results of previously computed [concurrent future Futures](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects. The results are serialized using [cloudpickle](https://github.com/cloudpipe/cloudpickle) and stored in a user-defined cache directory `cache_directory` to be reloaded later on. Internally, the hierarchical data format (HDF5) is used via the [h5py](https://www.h5py.org), which is an optional dependency for executorlib. \n", + "\n", + "The [h5py](https://www.h5py.org) package is an optional dependency of executorlib. The installation of the [h5py](https://www.h5py.org) package is covered in the installation section. " + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "ecdcef49-5c89-4538-b377-d53979673bf7", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[2, 4, 6]\n", + "CPU times: user 547 ms, sys: 161 ms, total: 708 ms\n", + "Wall time: 1.33 s\n" + ] + } + ], + "source": [ + "%%time\n", + "with Executor(backend=\"local\", cache_directory=\"./cache\") as exe:\n", + " future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n", + " print([f.result() for f in future_lst])" + ] + }, + { + "cell_type": "markdown", + "id": "32d0fb2e-5ac1-4249-b6c8-953c92fdfded", + "metadata": {}, + "source": [ + "When the same code is executed again, executorlib finds the existing results in the cache directory specified by the `cache_directory` parameter and reloads the result, accelerating the computation especially during the prototyping phase when similar computations are repeated frequently for testing. \n", + "\n", + "Still it is important to mention, that this cache is not designed to identify the submission of the same parameters within the context of one `with`-statement. It is the task of the user to minimize duplicate computations, the cache is only designed to restore previous calculation results when the Python process managing executorlib was stopped after the successful execution. " + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "c39babe8-4370-4d31-9520-9a7ce63378c8", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[2, 4, 6]\n", + "CPU times: user 52.1 ms, sys: 41.1 ms, total: 93.2 ms\n", + "Wall time: 1.13 s\n" + ] + } + ], + "source": [ + "%%time\n", + "with Executor(backend=\"local\", cache_directory=\"./cache\") as exe:\n", + " future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n", + " print([f.result() for f in future_lst])" + ] + }, + { + "cell_type": "markdown", + "id": "68092479-e846-494a-9ac9-d9638b102bd8", + "metadata": {}, + "source": [ + "After the development phase is concluded it is the task of the user to remove the cache directory defined with the `cache_directory` parameter. The cache directory is never removed by executorlib to prevent the repeation of expensive computations. Still as disk space on shared file systems in HPC environments is commonly limited it is recommended to remove the cache directory once the development process concluded. " + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "34a9316d-577f-4a63-af14-736fb4e6b219", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "['sumb6a5053f96b7031239c2e8d0e7563ce4.h5out', 'sum5171356dfe527405c606081cfbd2dffe.h5out', 'sumd1bf4ee658f1ac42924a2e4690e797f4.h5out']\n" + ] + } + ], + "source": [ + "import os\n", + "import shutil\n", + "\n", + "cache_dir = \"./cache\"\n", + "if os.path.exists(cache_dir):\n", + " print(os.listdir(cache_dir))\n", + " try:\n", + " shutil.rmtree(cache_dir)\n", + " except OSError:\n", + " pass" + ] + }, + { + "cell_type": "markdown", + "id": "1cea95b5-4110-444c-82af-fa6718bfa17f", + "metadata": {}, + "source": [ + "Typically the use of the cache is recommended for development processes only and for production workflows the user should implement their own long-term storage solution. The binary format used by executorlib is based on [cloudpickle](https://github.com/cloudpipe/cloudpickle) and might change in future without further notice, rendering existing data in the cache unusable. Consequently, using the cache beyond the development process is not recommended. In addition the writing of the results to files might result in additional overhead for accessing the shared file system. " + ] + }, + { + "cell_type": "markdown", + "id": "71a8a0be-a933-4e83-9da5-50da35e9975b", + "metadata": {}, + "source": [ + "### Dependencies\n", + "Many scientific Python programs consist of series of Python function calls with varying level of parallel computations or map-reduce patterns where the same function is first mapped to a number of parameters and afterwards the results are reduced in a single function. To extend the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor) of the Python standard library to support this programming pattern, the `Executor` class from executorlib supports submitting Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) objects to the `Executor` which are resolved before submission. So the `Executor` internally waits until all Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) objects are successfully executed before it triggers the execution of the submitted Python function. " + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "d8b75a26-479d-405e-8895-a8d56b3f0f4b", + "metadata": {}, + "outputs": [], + "source": [ + "def calc_add(a, b):\n", + " return a + b" + ] + }, + { + "cell_type": "markdown", + "id": "36118ae0-c13c-4f7a-bcd3-3d7f4bb5a078", + "metadata": {}, + "source": [ + "For example the function which adds two numbers `calc_add()` is used in a loop which adds a counter to the previous numbers. In the first iteration the `future` parameter is set to `0` but already in the second iteration it is the Python [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object of the first iteration and so on. \n", + "\n", + "The important part is that the user does not have to wait until the first function is executed but instead the waiting happens internally in the `Executor`. " + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "35fd5747-c57d-4926-8d83-d5c55a130ad6", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "6\n" + ] + } + ], + "source": [ + "with Executor(backend=\"local\") as exe:\n", + " future = 0\n", + " for i in range(1, 4):\n", + " future = exe.submit(calc_add, i, future)\n", + " print(future.result())" + ] + }, + { + "cell_type": "markdown", + "id": "38e1bbb3-1028-4f50-93c1-d2427f399a7d", + "metadata": {}, + "source": [ + "As the reusing of existing [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) object can lead to rather complex dependencies, executorlib provides the option to plot the dependency graph by setting the `plot_dependency_graph=True` during the initialization of the `Executor` class. \n", + "\n", + "No computation is executed when the `plot_dependency_graph=True` is set. This parameter is for debugging only. \n", + "\n", + "Internally, the [pygraphviz](https://pygraphviz.github.io/documentation/stable) package is used for the visualisation of these dependency graphs. It is an optional dependency of executorlib. The installation of the [pygraphviz](https://pygraphviz.github.io/documentation/stable) package is covered in the installation section. " + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "f67470b5-af1d-4add-9de8-7f259ca67324", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "None\n" + ] + }, + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "0\n", + "\n", + "calc_add\n", + "\n", + "\n", + "\n", + "1\n", + "\n", + "calc_add\n", + "\n", + "\n", + "\n", + "0->1\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "2\n", + "\n", + "calc_add\n", + "\n", + "\n", + "\n", + "1->2\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "3\n", + "\n", + "1\n", + "\n", + "\n", + "\n", + "3->0\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "4\n", + "\n", + "0\n", + "\n", + "\n", + "\n", + "4->0\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "5\n", + "\n", + "2\n", + "\n", + "\n", + "\n", + "5->1\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "6\n", + "\n", + "3\n", + "\n", + "\n", + "\n", + "6->2\n", + "\n", + "\n", + "\n", + "\n", + "" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "with Executor(backend=\"local\", plot_dependency_graph=True) as exe:\n", + " future = 0\n", + " for i in range(1, 4):\n", + " future = exe.submit(calc_add, i, future)\n", + " print(future.result())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "67b52bd0-bd51-4538-a089-2776b8034547", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/notebooks/2-hpc-submission.ipynb b/notebooks/2-hpc-submission.ipynb new file mode 100644 index 00000000..28c9f25f --- /dev/null +++ b/notebooks/2-hpc-submission.ipynb @@ -0,0 +1,277 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "ddf66f38-dc4a-4306-8b1c-b923fdb76922", + "metadata": {}, + "source": [ + "# HPC Submission Mode\n", + "In contrast to the [local mode] and the [HPC allocation mode] the HPC Submission Mode does not communicate via the [zero message queue](https://zeromq.org) but instead stores the python functions on the file system and uses the job scheduler to handle the dependencies of the Python functions. Consequently, the block allocation `block_allocation` and the init function `init_function` are not available in HPC Submission mode. At the same time it is possible to close the Python process which created the `Executor`, wait until the execution of the submitted Python functions is completed and afterwards reload the results from the cache. \n", + "\n", + "Internally the HPC submission mode is using the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io) to connect to HPC job schedulers and the [h5py](https://www.h5py.org) package for serializing the Python functions to store them on the file system. Both packages are optional dependency of executorlib. The installation of the [pysqa](https://pysqa.readthedocs.io) package and the [h5py](https://www.h5py.org) package are covered in the installation section. " + ] + }, + { + "cell_type": "markdown", + "id": "d56862a6-8279-421d-a090-7ca2a3c4d416", + "metadata": {}, + "source": [ + "## SLURM\n", + "The [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com) job scheduler is currently the most commonly used job scheduler for HPC clusters. In the HPC submission mode executorlib internally uses the [sbatch](https://slurm.schedmd.com/sbatch.html) command this is in contrast to the [HPC allocatiom mode] which internally uses the [srun](https://slurm.schedmd.com/srun.html) command. \n", + "\n", + "The connection to the job scheduler is based on the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io). It provides a default configuration for most commonly used job schedulers including SLURM, in addition it is also possible to provide the submission template as part of the resource dictionary `resource_dict` or via the path to the configuration directory with the `pysqa_config_directory` parameter. All three options are covered in more detail on the [pysqa documentation](https://pysqa.readthedocs.io)." + ] + }, + { + "cell_type": "markdown", + "id": "db7760e8-35a6-4a1c-8b0f-410b536c3835", + "metadata": {}, + "source": [ + "```python\n", + "from executorlib import Executor\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "b20913f3-59e4-418c-a399-866124f8e497", + "metadata": {}, + "source": [ + "In comparison to the [Local Mode](), the only two parameters which are changed are the specification of the backend as `backend=\"slurm_submission\"` and the requirement to specify the cache directory using the `cache_directory=\"./cache\"`. The rest of the syntax remains exactly the same, to simplify the up-scaling of simulation workflows. " + ] + }, + { + "cell_type": "markdown", + "id": "0b8f3b77-6199-4736-9f28-3058c5230777", + "metadata": {}, + "source": [ + "```python\n", + "with Executor(backend=\"slurm_submission\", cache_directory=\"./cache\") as exe:\n", + " future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n", + " print([f.result() for f in future_lst])\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "37bef7ac-ce3e-4d8a-b848-b1474c370bca", + "metadata": {}, + "source": [ + "Specific parameters for HPC submission mode like the maximum run time `\"run_time_max\"`, the maximum memory `\"memory_max\"` or the submission template for the job submission script `\"submission_template\"` can be specified as part of the resource dictionary. Again it is possible to specify the resource dictonary `resource_dicionary` either for each function in the `submit()` function or during the initialization of the `Executor`. " + ] + }, + { + "cell_type": "markdown", + "id": "658781de-f222-4235-8c26-b0f77a0831b3", + "metadata": {}, + "source": [ + "```python\n", + "submission_template = \"\"\"\\\n", + "#!/bin/bash\n", + "#SBATCH --output=time.out\n", + "#SBATCH --job-name={{job_name}}\n", + "#SBATCH --chdir={{working_directory}}\n", + "#SBATCH --get-user-env=L\n", + "#SBATCH --partition={{partition}}\n", + "{%- if run_time_max %}\n", + "#SBATCH --time={{ [1, run_time_max // 60]|max }}\n", + "{%- endif %}\n", + "{%- if dependency %}\n", + "#SBATCH --dependency=afterok:{{ dependency | join(',') }}\n", + "{%- endif %}\n", + "{%- if memory_max %}\n", + "#SBATCH --mem={{memory_max}}G\n", + "{%- endif %}\n", + "#SBATCH --cpus-per-task={{cores}}\n", + "\n", + "{{command}}\n", + "\"\"\"\n", + "\n", + "with Executor(backend=\"slurm_submission\", cache_directory=\"./cache\") as exe:\n", + " future = exe.submit(\n", + " sum, [4, 4], \n", + " resource_dict={\n", + " \"submission_template\": submission_template, \n", + " \"run_time_max\": 180, # in seconds \n", + " })\n", + " print([f.result() for f in future_lst])\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "f7ad9c97-7743-4f87-9344-4299b2b31a56", + "metadata": {}, + "source": [ + "With these options executorlib in combination with the SLURM job scheduler provides a lot flexibility to configure the submission of Python functions depending on the specific configuration of the job scheduler. " + ] + }, + { + "cell_type": "markdown", + "id": "2a814efb-2fbc-41ba-98df-cf121d19ea66", + "metadata": {}, + "source": [ + "## Flux\n", + "While most HPC job schedulers require extensive configuration before they can be tested, the [flux framework](http://flux-framework.org) can be installed with the conda package manager, as explained in the [installation section](). This simple installation makes the flux framework especially suitable for demonstrations, testing and continous integration. So below a number of features for the HPC submission mode are demonstrated based on the example of the [flux framework](http://flux-framework.org) still the same applies to other job schedulers like SLURM introduced above." + ] + }, + { + "cell_type": "markdown", + "id": "29d7aa18-357e-416e-805c-1322b59abec1", + "metadata": {}, + "source": [ + "### Dependencies\n", + "As already demonstrated for the [Local Mode]() the `Executor` class from executorlib is capable of resolving the dependencies of serial functions, when [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects are used as inputs for subsequent function calls. For the case of the HPC submission these dependencies are communicated to the job scheduler, which allows to stop the Python process which created the `Executor` class, wait until the execution of the submitted Python functions is completed and afterwards restart the Python process for the `Executor` class and reload the calculation results from the cache defined by the `cache_directory` parameter." + ] + }, + { + "cell_type": "markdown", + "id": "3d55176a-facc-4ff5-91cd-690d480bd5b8", + "metadata": {}, + "source": [ + "```python\n", + "def add_funct(a, b):\n", + " return a + b\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "77125681-9344-43c4-8904-46d48cb90104", + "metadata": {}, + "source": [ + "```python\n", + "with Executor(backend=\"flux_submission\", cache_directory=\"./cache\") as exe:\n", + " future = 0\n", + " for i in range(4, 8):\n", + " future = exe.submit(add_funct, i, future)\n", + " print(future.result())\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "ca75cb6c-c50f-4bee-9b09-d8d29d6c263b", + "metadata": {}, + "source": [ + "### Resource Assignment\n", + "In analogy to the [Local Mode]() the resource assignment for the HPC submission mode is handled by either including the resource dictionary parameter `resource_dict` in the initialization of the `Executor` class or in every call of the `submit()` function. \n", + "\n", + "Below this is demonstrated once for the assignment of muliple CPU cores for the execution of a Python function which internally uses the message passing interface (MPI) via the [mpi4py](https://mpi4py.readthedocs.io) package. " + ] + }, + { + "cell_type": "markdown", + "id": "ea800f9a-6915-4b5a-bc57-2e072cc95437", + "metadata": {}, + "source": [ + "```python\n", + "def calc(i):\n", + " from mpi4py import MPI\n", + "\n", + " size = MPI.COMM_WORLD.Get_size()\n", + " rank = MPI.COMM_WORLD.Get_rank()\n", + " return i, size, rank\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "8bebb1b4-25fc-4f57-8633-a2677b712a87", + "metadata": {}, + "source": [ + "```python\n", + "with Executor(backend=\"flux_submission\", cache_directory=\"./cache\") as exe:\n", + " fs = exe.submit(calc, 3, resource_dict={\"cores\": 2})\n", + " print(fs.result())\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "d91499d7-5c6c-4c10-b7b7-bfc4b87ddaa8", + "metadata": {}, + "source": [ + "Beyond CPU cores and threads which were previously also introduced for the [Local Mode]() the HPC submission mode also provides the option to select the available accelerator cards or GPUs, by specifying the `\"gpus_per_core\"` parameter in the resource dictionary `resource_dict`. For demonstration we create a Python function which reads the GPU device IDs and submit it to the `Executor` class:\n", + "```python\n", + "def get_available_gpus():\n", + " import socket\n", + " from tensorflow.python.client import device_lib\n", + " local_device_protos = device_lib.list_local_devices()\n", + " return [\n", + " (x.name, x.physical_device_desc, socket.gethostname()) \n", + " for x in local_device_protos if x.device_type == 'GPU'\n", + " ]\n", + "```\n", + "\n", + "```python\n", + "with Executor(\n", + " backend=\"flux_submission\",\n", + " cache_directory=\"./cache\",\n", + " resource_dict={\"gpus_per_core\": 1}\n", + ") as exe:\n", + " fs_1 = exe.submit(get_available_gpus)\n", + " fs_2 = exe.submit(get_available_gpus)\n", + " print(fs_1.result(), fs_2.result())\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "3f47fd34-04d1-42a7-bb06-6821dc99a648", + "metadata": {}, + "source": [ + "### Cleaning Cache\n", + "Finally, as the HPC Submission Mode leverages the file system to communicate serialized Python functions, it is important to clean up the cache directory specified by the `cache_directory` parameter once the results of the submitted Python functions are no longer needed. The serialized Python functions are stored in binary format using the [cloudpickle](https://github.com/cloudpipe/cloudpickle) library for serialization. This format is design for caching but not for long-term storage. The user is responsible for the long-term storage of their data." + ] + }, + { + "cell_type": "markdown", + "id": "481eeb82-9240-4fdf-84ab-87e39681d201", + "metadata": {}, + "source": [ + "```python\n", + "import os\n", + "import shutil\n", + "\n", + "cache_dir = \"./cache\"\n", + "if os.path.exists(cache_dir):\n", + " print(os.listdir(cache_dir))\n", + " try:\n", + " shutil.rmtree(cache_dir)\n", + " except OSError:\n", + " pass\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1de93586-d302-4aa6-878a-51acfb1d3009", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/notebooks/3-hpc-allocation.ipynb b/notebooks/3-hpc-allocation.ipynb new file mode 100644 index 00000000..f0d2c604 --- /dev/null +++ b/notebooks/3-hpc-allocation.ipynb @@ -0,0 +1,437 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "87c3425d-5abe-4e0b-a948-e371808c322c", + "metadata": {}, + "source": [ + "# HPC Allocation Mode\n", + "In contrast to the [HPC Submission Mode]() which submitts individual Python functions to HPC job schedulers, the HPC Allocation Mode takes a given allocation of the HPC job scheduler and executes Python functions with the resources available in this allocation. In this regard it is similar to the [Local Mode]() as it communicates with the individual Python processes using the [zero message queue](https://zeromq.org/), still it is more advanced as it can access the computational resources of all compute nodes of the given HPC allocation and also provides the option to assign GPUs as accelerators for parallel execution.\n", + "\n", + "Available Functionality: \n", + "* Submit Python functions with the [submit() function or the map() function]().\n", + "* Support for parallel execution, either using the [message passing interface (MPI)](), [thread based parallelism]() or by [assigning dedicated GPUs]() to selected Python functions. All these resources assignments are handled via the [resource dictionary parameter resource_dict]().\n", + "* Performance optimization features, like [block allocation](), [dependency resolution]() and [caching]().\n", + "\n", + "The only parameter the user has to change is the `backend` parameter. " + ] + }, + { + "cell_type": "markdown", + "id": "8c788b9f-6b54-4ce0-a864-4526b7f6f170", + "metadata": {}, + "source": [ + "## SLURM\n", + "With the [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com/) currently being the most commonly used job scheduler, executorlib provides an interface to submit Python functions to SLURM. Internally, this is based on the [srun](https://slurm.schedmd.com/srun.html) command of the SLURM scheduler, which creates job steps in a given allocation. Given that all resource requests in SLURM are communicated via a central database a large number of submitted Python functions and resulting job steps can slow down the performance of SLURM. To address this limitation it is recommended to install the hierarchical job scheduler [flux](https://flux-framework.org/) in addition to SLURM, to use flux for distributing the resources within a given allocation. This configuration is discussed in more detail below in the section [SLURM with flux]()." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "133b751f-0925-4d11-99f0-3f8dd9360b54", + "metadata": {}, + "outputs": [], + "source": [ + "from executorlib import Executor" + ] + }, + { + "cell_type": "markdown", + "id": "9b74944e-2ccd-4cb0-860a-d876310ea870", + "metadata": {}, + "source": [ + "```python\n", + "with Executor(backend=\"slurm_allocation\") as exe:\n", + " future = exe.submit(sum, [1, 1])\n", + " print(future.result())\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "36e2d68a-f093-4082-933a-d95bfe7a60c6", + "metadata": {}, + "source": [ + "## SLURM with Flux \n", + "As discussed in the installation section it is important to select the [flux](https://flux-framework.org/) version compatible to the installation of a given HPC cluster. Which GPUs are available? Who manufactured these GPUs? Does the HPC use [mpich](https://www.mpich.org/) or [OpenMPI](https://www.open-mpi.org/) or one of their commercial counter parts like cray MPI or intel MPI? Depending on the configuration different installation options can be choosen, as explained in the [installation section](). \n", + "\n", + "Afterwards flux can be started in an [sbatch](https://slurm.schedmd.com/sbatch.html) submission script using:\n", + "```\n", + "srun flux start python \n", + "```\n", + "In this Python script `` the `\"flux_allocation\"` backend can be used." + ] + }, + { + "cell_type": "markdown", + "id": "68be70c3-af18-4165-862d-7022d35bf9e4", + "metadata": {}, + "source": [ + "### Resource Assignment\n", + "Independent of the selected backend [local mode](), [HPC submission mode]() or HPC allocation mode the assignment of the computational resoruces remains the same. They can either be specified in the `submit()` function by adding the resource dictionary parameter [resource_dict]() or alternatively during the initialization of the `Executor` class by adding the resource dictionary parameter [resource_dict]() there. \n", + "\n", + "This functionality of executorlib is commonly used to rewrite individual Python functions to use MPI while the rest of the Python program remains serial." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "8a2c08df-cfea-4783-ace6-68fcd8ebd330", + "metadata": {}, + "outputs": [], + "source": [ + "def calc_mpi(i):\n", + " from mpi4py import MPI\n", + "\n", + " size = MPI.COMM_WORLD.Get_size()\n", + " rank = MPI.COMM_WORLD.Get_rank()\n", + " return i, size, rank" + ] + }, + { + "cell_type": "markdown", + "id": "715e0c00-7b17-40bb-bd55-b0e097bfef07", + "metadata": {}, + "source": [ + "Depending on the choice of MPI version, it is recommended to specify the pmi standard which [flux](https://flux-framework.org/) should use internally for the resource assignment. For example for OpenMPI >=5 `\"pmix\"` is the recommended pmi standard." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "5802c7d7-9560-4909-9d30-a915a91ac0a1", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[(3, 2, 0), (3, 2, 1)]\n" + ] + } + ], + "source": [ + "with Executor(backend=\"flux_allocation\", flux_executor_pmi_mode=\"pmix\") as exe:\n", + " fs = exe.submit(calc_mpi, 3, resource_dict={\"cores\": 2})\n", + " print(fs.result())" + ] + }, + { + "cell_type": "markdown", + "id": "da862425-08b6-4ced-999f-89a74e85f410", + "metadata": {}, + "source": [ + "### Block Allocation\n", + "The block allocation for the HPC allocation mode follows the same implementation as the [block allocation for the local mode](). It starts by defining the initialization function `init_function()` which returns a dictionary which is internally used to look up input parameters for Python functions submitted to the `Executor` class. Commonly this functionality is used to store large data objects inside the Python process created for the block allocation, rather than reloading these Python objects for each submitted function. " + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "cdc742c0-35f7-47ff-88c0-1b0dbeabe51b", + "metadata": {}, + "outputs": [], + "source": [ + "def init_function():\n", + " return {\"j\": 4, \"k\": 3, \"l\": 2}" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "5ddf8343-ab2c-4469-ac9f-ee568823d4ad", + "metadata": {}, + "outputs": [], + "source": [ + "def calc_with_preload(i, j, k):\n", + " return i + j + k" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "0da13efa-1941-416f-b9e6-bba15b5cdfa2", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "10\n" + ] + } + ], + "source": [ + "with Executor(\n", + " backend=\"flux_allocation\",\n", + " flux_executor_pmi_mode=\"pmix\",\n", + " max_workers=2,\n", + " init_function=init_function,\n", + " block_allocation=True,\n", + ") as exe:\n", + " fs = exe.submit(calc_with_preload, 2, j=5)\n", + " print(fs.result())" + ] + }, + { + "cell_type": "markdown", + "id": "82f3b947-e662-4a0d-b590-9475e0b4f7dd", + "metadata": {}, + "source": [ + "In this example the parameter `k` is used from the dataset created by the initialization function while the parameters `i` and `j` are specified by the call of the `submit()` function. \n", + "\n", + "When using the block allocation mode, it is recommended to set either the maxium number of workers using the `max_workers` parameter or the maximum number of CPU cores using the `max_cores` parameter to prevent oversubscribing the available resources. " + ] + }, + { + "cell_type": "markdown", + "id": "8ced8359-8ecb-480b-966b-b85d8446d85c", + "metadata": {}, + "source": [ + "### Dependencies\n", + "Python functions with rather different computational resource requirements should not be merged into a single function. So to able to execute a series of Python functions which each depend on the output of the previous Python function executorlib internally handles the dependencies based on the [concurrent futures future](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects from the Python standard library. This implementation is independent of the selected backend and works for HPC allocation mode just like explained in the [local mode section](). " + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "bd26d97b-46fd-4786-9ad1-1e534b31bf36", + "metadata": {}, + "outputs": [], + "source": [ + "def add_funct(a, b):\n", + " return a + b" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "1a2d440f-3cfc-4ff2-b74d-e21823c65f69", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "6\n" + ] + } + ], + "source": [ + "with Executor(backend=\"flux_allocation\", flux_executor_pmi_mode=\"pmix\") as exe:\n", + " future = 0\n", + " for i in range(1, 4):\n", + " future = exe.submit(add_funct, i, future)\n", + " print(future.result())" + ] + }, + { + "cell_type": "markdown", + "id": "f526c2bf-fdf5-463b-a955-020753138415", + "metadata": {}, + "source": [ + "### Caching\n", + "Finally, also the caching is available for HPC allocation mode, in analogy to the [local mode](). Again this functionality is not designed to identify function calls with the same parameters, but rather provides the option to reload previously cached results even after the Python processes which contained the executorlib `Executor` class is closed. As the cache is stored on the file system, this option can decrease the performance of executorlib. Consequently the caching option should primarily be used during the prototyping phase. " + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "dcba63e0-72f5-49d1-ab04-2092fccc1c47", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[2, 4, 6]\n" + ] + } + ], + "source": [ + "with Executor(\n", + " backend=\"flux_allocation\", flux_executor_pmi_mode=\"pmix\", cache_directory=\"./cache\"\n", + ") as exe:\n", + " future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n", + " print([f.result() for f in future_lst])" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "c3958a14-075b-4c10-9729-d1c559a9231c", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "['sumd1bf4ee658f1ac42924a2e4690e797f4.h5out', 'sum5171356dfe527405c606081cfbd2dffe.h5out', 'sumb6a5053f96b7031239c2e8d0e7563ce4.h5out']\n" + ] + } + ], + "source": [ + "import os\n", + "import shutil\n", + "\n", + "cache_dir = \"./cache\"\n", + "if os.path.exists(cache_dir):\n", + " print(os.listdir(cache_dir))\n", + " try:\n", + " shutil.rmtree(cache_dir)\n", + " except OSError:\n", + " pass" + ] + }, + { + "cell_type": "markdown", + "id": "c24ca82d-60bd-4fb9-a082-bf9a81e838bf", + "metadata": {}, + "source": [ + "### Nested executors\n", + "The hierarchical nature of the [flux](https://flux-framework.org/) job scheduler allows the creation of additional executorlib Executors inside the functions submitted to the Executor. This hierarchy can be beneficial to separate the logic to saturate the available computational resources. " + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "06fb2d1f-65fc-4df6-9402-5e9837835484", + "metadata": {}, + "outputs": [], + "source": [ + "def calc_nested():\n", + " from executorlib import Executor\n", + "\n", + " with Executor(backend=\"flux_allocation\", flux_executor_pmi_mode=\"pmix\") as exe:\n", + " fs = exe.submit(sum, [1, 1])\n", + " return fs.result()" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "89b7d0fd-5978-4913-a79a-f26cc8047445", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2\n" + ] + } + ], + "source": [ + "with Executor(backend=\"flux_allocation\", flux_executor_pmi_mode=\"pmix\") as exe:\n", + " fs = exe.submit(calc_nested)\n", + " print(fs.result())" + ] + }, + { + "cell_type": "markdown", + "id": "34a8c690-ca5a-41d1-b38f-c67eff085750", + "metadata": {}, + "source": [ + "### Resource Monitoring\n", + "For debugging it is commonly helpful to keep track of the computational resources. [flux](https://flux-framework.org/) provides a number of features to analyse the resource utilization, so here only the two most commonly used ones are introduced. Starting with the option to list all the resources available in a given allocation with the `flux resource list` command:" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "7481eb0a-a41b-4d46-bb48-b4db299fcd86", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " STATE NNODES NCORES NGPUS NODELIST\n", + " free 1 2 0 fedora\n", + " allocated 0 0 0 \n", + " down 0 0 0 \n" + ] + } + ], + "source": [ + "! flux resource list" + ] + }, + { + "cell_type": "markdown", + "id": "08d98134-a0e0-4841-be82-e09e1af29e7f", + "metadata": {}, + "source": [ + "Followed by the list of jobs which were executed in a given flux session. This can be retrieved using the `flux jobs -a` command:" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "1ee6e147-f53a-4526-8ed0-fd036f2ee6bf", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " JOBID USER NAME ST NTASKS NNODES TIME INFO\n", + "\u001b[01;32m ƒDqBpVYK jan python CD 1 1 0.695s fedora\n", + "\u001b[0;0m\u001b[01;32m ƒDxdEtYf jan python CD 1 1 0.225s fedora\n", + "\u001b[0;0m\u001b[01;32m ƒDVahzPq jan python CD 1 1 0.254s fedora\n", + "\u001b[0;0m\u001b[01;32m ƒDSsZJXH jan python CD 1 1 0.316s fedora\n", + "\u001b[0;0m\u001b[01;32m ƒDSu3Hod jan python CD 1 1 0.277s fedora\n", + "\u001b[0;0m\u001b[01;32m ƒDFbkmFD jan python CD 1 1 0.247s fedora\n", + "\u001b[0;0m\u001b[01;32m ƒD9eKeas jan python CD 1 1 0.227s fedora\n", + "\u001b[0;0m\u001b[01;32m ƒD3iNXCs jan python CD 1 1 0.224s fedora\n", + "\u001b[0;0m\u001b[01;32m ƒCoZ3P5q jan python CD 1 1 0.261s fedora\n", + "\u001b[0;0m\u001b[01;32m ƒCoXZPoV jan python CD 1 1 0.261s fedora\n", + "\u001b[0;0m\u001b[01;32m ƒCZ1URjd jan python CD 2 1 0.360s fedora\n", + "\u001b[0;0m" + ] + } + ], + "source": [ + "! flux jobs -a" + ] + }, + { + "cell_type": "markdown", + "id": "021f165b-27cc-4676-968b-cbcfd1f0210a", + "metadata": {}, + "source": [ + "## Flux\n", + "While the number of HPC clusters which use [flux](https://flux-framework.org/) as primary job scheduler is currently still limited the setup and functionality provided by executorlib for running [SLURM with flux]() also applies to HPCs which use [flux](https://flux-framework.org/) as primary job scheduler." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "04f03ebb-3f9e-4738-b9d2-5cb0db9b63c3", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/notebooks/4-developer.ipynb b/notebooks/4-developer.ipynb new file mode 100644 index 00000000..83c14282 --- /dev/null +++ b/notebooks/4-developer.ipynb @@ -0,0 +1,351 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "511b34e0-12af-4437-8915-79f033fe7cda", + "metadata": {}, + "source": [ + "# Developer\n", + "executorlib is designed to work out of the box for up-scaling Python functions and distribute them on a high performance computing (HPC) cluster. Most users should only import the `Executor` class from executorlib and should not need to use any of the internal functionality covered in this section. Still for more advanced applications beyond the submission of Python functions executorlib provides additional functionality. The functionality in this section is not officially supported and might change in future versions without further notice. " + ] + }, + { + "cell_type": "markdown", + "id": "7bc073aa-6036-48e7-9696-37af050d438a", + "metadata": {}, + "source": [ + "## Communication\n", + "The key functionality of the `executorlib` package is the up-scaling of python functions with thread based parallelism, \n", + "MPI based parallelism or by assigning GPUs to individual python functions. In the background this is realized using a \n", + "combination of the [zero message queue](https://zeromq.org) and [cloudpickle](https://github.com/cloudpipe/cloudpickle) \n", + "to communicate binary python objects. The `executorlib.standalone.interactive.communication.SocketInterface` is an abstraction of this \n", + "interface, which is used in the other classes inside `executorlib` and might also be helpful for other projects. It \n", + "comes with a series of utility functions:\n", + "\n", + "* `executorlib.standalone.interactive.communication.interface_bootup()`: To initialize the interface\n", + "* `executorlib.standalone.interactive.communication.interface_connect()`: To connect the interface to another instance\n", + "* `executorlib.standalone.interactive.communication.interface_send()`: To send messages via this interface \n", + "* `executorlib.standalone.interactive.communication.interface_receive()`: To receive messages via this interface \n", + "* `executorlib.standalone.interactive.communication.interface_shutdown()`: To shutdown the interface\n", + "\n", + "While `executorlib` was initially designed for up-scaling python functions for HPC, the same functionality can be \n", + "leveraged to up-scale any executable independent of the programming language it is developed in." + ] + }, + { + "cell_type": "markdown", + "id": "8754df33-fa95-4ca6-ae02-6669967cf4e7", + "metadata": {}, + "source": [ + "## External Executables\n", + "On extension beyond the submission of Python functions is the communication with an external executable. This could be any kind of program written in any programming language which does not provide Python bindings so it cannot be represented in Python functions. " + ] + }, + { + "cell_type": "markdown", + "id": "75af1f8a-7ad7-441f-80a2-5c337484097f", + "metadata": {}, + "source": [ + "### Subprocess\n", + "If the external executable is called only once, then the call to the external executable can be represented in a Python function with the [subprocess](https://docs.python.org/3/library/subprocess.html) module of the Python standard library. In the example below the shell command `echo test` is submitted to the `execute_shell_command()` function, which itself is submitted to the `Executor` class." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "83515b16-c4d5-4b02-acd7-9e1eb57fd335", + "metadata": {}, + "outputs": [], + "source": [ + "from executorlib import Executor" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "f1ecee94-24a6-4bf9-8a3d-d50eba994367", + "metadata": {}, + "outputs": [], + "source": [ + "def execute_shell_command(\n", + " command: list, universal_newlines: bool = True, shell: bool = False\n", + "):\n", + " import subprocess\n", + "\n", + " return subprocess.check_output(\n", + " command, universal_newlines=universal_newlines, shell=shell\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "32ef5b63-3245-4336-ac0e-b4a6673ee362", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "test\n", + "\n" + ] + } + ], + "source": [ + "with Executor(backend=\"local\") as exe:\n", + " future = exe.submit(\n", + " execute_shell_command,\n", + " [\"echo\", \"test\"],\n", + " universal_newlines=True,\n", + " shell=False,\n", + " )\n", + " print(future.result())" + ] + }, + { + "cell_type": "markdown", + "id": "54837938-01e0-4dd3-b989-1133d3318929", + "metadata": {}, + "source": [ + "### Interactive\n", + "The more complex case is the interaction with an external executable during the run time of the executable. This can be implemented with executorlib using the block allocation `block_allocation=True` feature. The external executable is started as part of the initialization function `init_function` and then the indivdual functions submitted to the `Executor` class interact with the process which is connected to the external executable. \n", + "\n", + "Starting with the definition of the executable, in this example it is a simple script which just increases a counter. The script is written in the file `count.py` so it behaves like an external executable, which could also use any other progamming language. " + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "dedf138f-3003-4a91-9f92-03983ac7de08", + "metadata": {}, + "outputs": [], + "source": [ + "count_script = \"\"\"\\\n", + "def count(iterations):\n", + " for i in range(int(iterations)):\n", + " print(i)\n", + " print(\"done\")\n", + "\n", + "\n", + "if __name__ == \"__main__\":\n", + " while True:\n", + " user_input = input()\n", + " if \"shutdown\" in user_input:\n", + " break\n", + " else:\n", + " count(iterations=int(user_input))\n", + "\"\"\"\n", + "\n", + "with open(\"count.py\", \"w\") as f:\n", + " f.writelines(count_script)" + ] + }, + { + "cell_type": "markdown", + "id": "771b5b84-48f0-4989-a2c8-c8dcb4462781", + "metadata": {}, + "source": [ + "The connection to the external executable is established in the initialization function `init_function` of the `Executor` class. By using the [subprocess](https://docs.python.org/3/library/subprocess.html) module from the standard library two process pipes are created to communicate with the external executable. One process pipe is connected to the standard input `stdin` and the other is connected to the standard output `stdout`. " + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "8fe76668-0f18-40b7-9719-de47dacb0911", + "metadata": {}, + "outputs": [], + "source": [ + "def init_process():\n", + " import subprocess\n", + "\n", + " return {\n", + " \"process\": subprocess.Popen(\n", + " [\"python\", \"count.py\"],\n", + " stdin=subprocess.PIPE,\n", + " stdout=subprocess.PIPE,\n", + " universal_newlines=True,\n", + " shell=False,\n", + " )\n", + " }" + ] + }, + { + "cell_type": "markdown", + "id": "09dde7a1-2b43-4be7-ba36-38200b9fddf0", + "metadata": {}, + "source": [ + "The interaction function handles the data conversion from the Python datatypes to the strings which can be communicated to the external executable. It is important to always add a new line `\\n` to each command send via the standard input `stdin` to the external executable and afterwards flush the pipe by calling `flush()` on the standard input pipe `stdin`. " + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "7556f2bd-176f-4275-a87d-b5c940267888", + "metadata": {}, + "outputs": [], + "source": [ + "def interact(shell_input, process, lines_to_read=None, stop_read_pattern=None):\n", + " process.stdin.write(shell_input)\n", + " process.stdin.flush()\n", + " lines_count = 0\n", + " output = \"\"\n", + " while True:\n", + " output_current = process.stdout.readline()\n", + " output += output_current\n", + " lines_count += 1\n", + " if stop_read_pattern is not None and stop_read_pattern in output_current:\n", + " break\n", + " elif lines_to_read is not None and lines_to_read == lines_count:\n", + " break\n", + " return output" + ] + }, + { + "cell_type": "markdown", + "id": "5484b98b-546f-4f2c-8db1-919ce215e228", + "metadata": {}, + "source": [ + "Finally, to close the process after the external executable is no longer required it is recommended to define a shutdown function, which communicates to the external executable that it should shutdown. In the case of the `count.py` script defined above this is achieved by sending the keyword `shutdown`. " + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "d5344d2b-cb53-4d38-8cae-621e3b98bb56", + "metadata": {}, + "outputs": [], + "source": [ + "def shutdown(process):\n", + " process.stdin.write(\"shutdown\\n\")\n", + " process.stdin.flush()" + ] + }, + { + "cell_type": "markdown", + "id": "3899467c-dc54-41cb-b05e-b60f5cf97e46", + "metadata": {}, + "source": [ + "With these utility functions is to possible to communicate with any kind of external executable. Still for the specific implementation of the external executable it might be necessary to adjust the corresponding Python functions. Therefore this functionality is currently limited to developers and not considered a general feature of executorlib. " + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "747c1b78-4804-467b-9ac8-8144d8031da3", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0\n", + "1\n", + "2\n", + "3\n", + "done\n", + "\n", + "None\n" + ] + } + ], + "source": [ + "with Executor(\n", + " max_workers=1,\n", + " init_function=init_process,\n", + " block_allocation=True,\n", + ") as exe:\n", + " future = exe.submit(\n", + " interact, shell_input=\"4\\n\", lines_to_read=5, stop_read_pattern=None\n", + " )\n", + " print(future.result())\n", + " future_shutdown = exe.submit(shutdown)\n", + " print(future_shutdown.result())" + ] + }, + { + "cell_type": "markdown", + "id": "96e56af9-3031-4d7b-9111-d2d031a0a6e4", + "metadata": {}, + "source": [ + "## License\n", + "```\n", + "BSD 3-Clause License\n", + "\n", + "Copyright (c) 2022, Jan Janssen\n", + "All rights reserved.\n", + "\n", + "Redistribution and use in source and binary forms, with or without\n", + "modification, are permitted provided that the following conditions are met:\n", + "\n", + "* Redistributions of source code must retain the above copyright notice, this\n", + " list of conditions and the following disclaimer.\n", + "\n", + "* Redistributions in binary form must reproduce the above copyright notice,\n", + " this list of conditions and the following disclaimer in the documentation\n", + " and/or other materials provided with the distribution.\n", + "\n", + "* Neither the name of the copyright holder nor the names of its\n", + " contributors may be used to endorse or promote products derived from\n", + " this software without specific prior written permission.\n", + "\n", + "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS \"AS IS\"\n", + "AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE\n", + "IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE\n", + "DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE\n", + "FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL\n", + "DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR\n", + "SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER\n", + "CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,\n", + "OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n", + "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "c2522d54-a00b-49ae-81e4-69e8fa05c9c3", + "metadata": {}, + "source": [ + "## Modules\n", + "While it is not recommended to link to specific internal components of executorlib in external Python packages but rather only the `Executor` class should be used as central interface to executorlib, the internal architecture is briefly outlined below. \n", + "* `backend` - the backend module contains the functionality for the Python processes created by executorlib to execute the submitted Python functions.\n", + "* `base` - the base module contains the definition of the executorlib `ExecutorBase` class which is internally used to create the different interfaces. To compare if an given `Executor` class is based on executorlib compare with the `ExecutorBase` class which can be imported as `from executorlib.base.executor import ExecutorBase`.\n", + "* `cache` - the cache module defines the file based communication for the [HPC submission mode]().\n", + "* `interactive` - the interactive modules defines the [zero message queue](https://zeromq.org) based communication for the [local mode]() and the [HPC allocation mode]().\n", + "* `standalone` - the standalone module contains a number of utility functions which only depend on external libraries and do not have any internal dependency to other parts of `executorlib`. This includes the functionality to generate executable commands, the [h5py](https://www.h5py.org) based interface for caching, a number of input checks, routines to plot the dependencies of a number of future objects, functionality to interact with the [queues defined in the Python standard library](https://docs.python.org/3/library/queue.html), the interface for serialization based on [cloudpickle](https://github.com/cloudpipe/cloudpickle) and finally an extension to the [threading](https://docs.python.org/3/library/threading.html) of the Python standard library.\n", + "\n", + "Given the level of separation the integration of submodules from the standalone module in external software packages should be the easiest way to benefit from the developments in executorlib beyond just using the `Executor` class. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "39096340-f169-4438-b9c6-90c48ea37e4d", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/notebooks/examples.ipynb b/notebooks/examples.ipynb deleted file mode 100644 index 1546dcb8..00000000 --- a/notebooks/examples.ipynb +++ /dev/null @@ -1,771 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "id": "c31c95fe-9af4-42fd-be2c-713afa380e09", - "metadata": {}, - "source": [ - "# Examples\n", - "The `executorlib.Executor` extends the interface of the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\n", - "to simplify the up-scaling of individual functions in a given workflow." - ] - }, - { - "cell_type": "markdown", - "id": "a1c6370e-7c8a-4da2-ac7d-42a36e12b27c", - "metadata": {}, - "source": "## Compatibility\nStarting with the basic example of `1+1=2`. With the `ThreadPoolExecutor` from the [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nstandard library this can be written as: " - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "8b663009-60af-4d71-8ef3-2e9c6cd79cce", - "metadata": { - "trusted": true - }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": "2\n" - } - ], - "source": [ - "from concurrent.futures import ThreadPoolExecutor\n", - "\n", - "with ThreadPoolExecutor(max_workers=1) as exe:\n", - " future = exe.submit(sum, [1, 1])\n", - " print(future.result())" - ] - }, - { - "cell_type": "markdown", - "id": "56192fa7-bbd6-43fe-8598-ff764addfbac", - "metadata": {}, - "source": "In this case `max_workers=1` limits the number of threads used by the `ThreadPoolExecutor` to one. Then the `sum()`\nfunction is submitted to the executor with a list with two ones `[1, 1]` as input. A [`concurrent.futures.Future`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nobject is returned. The `Future` object allows to check the status of the execution with the `done()` method which \nreturns `True` or `False` depending on the state of the execution. Or the main process can wait until the execution is \ncompleted by calling `result()`. \n\nThe result of the calculation is `1+1=2`. " - }, - { - "cell_type": "markdown", - "id": "99aba5f3-5667-450c-b31f-2b53918b1896", - "metadata": {}, - "source": [ - "The `executorlib.Executor` class extends the interface of the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\n", - "class by providing more parameters to specify the level of parallelism. In addition, to specifying the maximum number \n", - "of workers `max_workers` the user can also specify the number of cores per worker `cores_per_worker` for MPI based \n", - "parallelism, the number of threads per core `threads_per_core` for thread based parallelism and the number of GPUs per\n", - "worker `gpus_per_worker`. Finally, for those backends which support over-subscribing this can also be enabled using the \n", - "`oversubscribe` parameter. All these parameters are optional, so the `executorlib.Executor` can be used as a drop-in\n", - "replacement for the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures).\n", - "\n", - "The previous example is rewritten for the `executorlib.Executor` in:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "2ed59582cab0eb29", - "metadata": {}, - "outputs": [], - "source": [ - "from executorlib import Executor\n", - "\n", - "with Executor(max_cores=1, backend=\"flux_allocation\") as exe:\n", - " future = exe.submit(sum, [1, 1])\n", - " print(future.result())" - ] - }, - { - "cell_type": "markdown", - "id": "e1ae417273ebf0f5", - "metadata": {}, - "source": "The result of the calculation is again `1+1=2`." - }, - { - "cell_type": "markdown", - "id": "bcf8a85c015d55da", - "metadata": {}, - "source": [ - "Beyond pre-defined functions like the `sum()` function, the same functionality can be used to submit user-defined \n", - "functions. In the next example a custom summation function is defined:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "70ff8c30cc13bfd5", - "metadata": {}, - "outputs": [], - "source": [ - "from executorlib import Executor\n", - "\n", - "\n", - "def calc(*args):\n", - " return sum(*args)\n", - "\n", - "\n", - "with Executor(max_cores=2, backend=\"flux_allocation\") as exe:\n", - " fs_1 = exe.submit(calc, [2, 1])\n", - " fs_2 = exe.submit(calc, [2, 2])\n", - " fs_3 = exe.submit(calc, [2, 3])\n", - " fs_4 = exe.submit(calc, [2, 4])\n", - " print(\n", - " [\n", - " fs_1.result(),\n", - " fs_2.result(),\n", - " fs_3.result(),\n", - " fs_4.result(),\n", - " ]\n", - " )" - ] - }, - { - "cell_type": "markdown", - "id": "495e6e17964fe936", - "metadata": {}, - "source": [ - "In contrast to the previous example where just a single function was submitted to a single worker, in this case a total\n", - "of four functions is submitted to a group of two workers `max_cores=2`. Consequently, the functions are executed as a\n", - "set of two pairs.\n", - "\n", - "It returns the corresponding sums as expected. The same can be achieved with the built-in [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\n", - "classes. Still one advantage of using the `executorlib.Executor` rather than the built-in ones, is the ability to execute\n", - "the same commands in interactive environments like [Jupyter notebooks](https://jupyter.org). This is achieved by using \n", - "[cloudpickle](https://github.com/cloudpipe/cloudpickle) to serialize the python function and its parameters rather than\n", - "the regular pickle package." - ] - }, - { - "cell_type": "markdown", - "id": "7f13ea3733327ff8", - "metadata": {}, - "source": [ - "For backwards compatibility with the [`multiprocessing.Pool`](https://docs.python.org/3/library/multiprocessing.html) \n", - "class the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\n", - "also implements the `map()` function to map a series of inputs to a function. The same `map()` function is also \n", - "available in the `executorlib.Executor`:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "c320897f8c44f364", - "metadata": {}, - "outputs": [], - "source": [ - "from executorlib import Executor\n", - "\n", - "\n", - "def calc(*args):\n", - " return sum(*args)\n", - "\n", - "\n", - "with Executor(max_cores=2, backend=\"flux_allocation\") as exe:\n", - " print(list(exe.map(calc, [[2, 1], [2, 2], [2, 3], [2, 4]])))" - ] - }, - { - "cell_type": "markdown", - "id": "6a22677b67784c97", - "metadata": {}, - "source": "The results remain the same. " - }, - { - "cell_type": "markdown", - "id": "240ad1f5dc0c43c2", - "metadata": {}, - "source": [ - "## Resource Assignment\n", - "By default, every submission of a python function results in a flux job (or SLURM job step) depending on the backend. \n", - "This is sufficient for function calls which take several minutes or longer to execute. For python functions with shorter \n", - "run-time `executorlib` provides block allocation (enabled by the `block_allocation=True` parameter) to execute multiple\n", - "python functions with similar resource requirements in the same flux job (or SLURM job step). \n", - "\n", - "The following example illustrates the resource definition on both level. This is redundant. For block allocations the \n", - "resources have to be configured on the **Executor level**, otherwise it can either be defined on the **Executor level**\n", - "or on the **Submission level**. The resource defined on the **Submission level** overwrite the resources defined on the \n", - "**Executor level**." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "631422e52b7f8b1d", - "metadata": {}, - "outputs": [], - "source": [ - "import flux.job\n", - "from executorlib import Executor\n", - "\n", - "\n", - "def calc_function(parameter_a, parameter_b):\n", - " return parameter_a + parameter_b\n", - "\n", - "\n", - "with flux.job.FluxExecutor() as flux_exe:\n", - " with Executor(\n", - " # Resource definition on the executor level\n", - " max_workers=2, # total number of cores available to the Executor\n", - " backend=\"flux_allocation\", # optional in case the backend is not recognized\n", - " # Optional resource definition\n", - " resource_dict={\n", - " \"cores\": 1,\n", - " \"threads_per_core\": 1,\n", - " \"gpus_per_core\": 0,\n", - " \"cwd\": \"/home/jovyan/notebooks\",\n", - " \"openmpi_oversubscribe\": False,\n", - " \"slurm_cmd_args\": [],\n", - " },\n", - " flux_executor=flux_exe,\n", - " flux_executor_pmi_mode=None,\n", - " flux_executor_nesting=False,\n", - " hostname_localhost=None, # only required on MacOS\n", - " block_allocation=False, # reuse existing processes with fixed resources\n", - " init_function=None, # only available with block_allocation=True\n", - " disable_dependencies=False, # disable dependency check for faster execution\n", - " refresh_rate=0.01, # for refreshing the dependencies\n", - " plot_dependency_graph=False, # visualize dependencies for debugging\n", - " ) as exe:\n", - " future_obj = exe.submit(\n", - " calc_function,\n", - " 1, # parameter_a\n", - " parameter_b=2,\n", - " # Resource definition on the submission level - optional\n", - " resource_dict={\n", - " \"cores\": 1,\n", - " \"threads_per_core\": 1,\n", - " \"gpus_per_core\": 0, # here it is gpus_per_core rather than gpus_per_worker\n", - " \"cwd\": \"/home/jovyan/notebooks\",\n", - " \"openmpi_oversubscribe\": False,\n", - " # \"slurm_cmd_args\": [], # additional command line arguments for SLURM\n", - " \"flux_executor\": flux_exe,\n", - " \"flux_executor_pmi_mode\": None,\n", - " \"flux_executor_nesting\": False,\n", - " \"hostname_localhost\": None, # only required on MacOS\n", - " },\n", - " )\n", - " print(future_obj.result())" - ] - }, - { - "cell_type": "markdown", - "id": "ab12ff4ebd5efb98", - "metadata": {}, - "source": [ - "The `max_cores` which defines the total number of cores of the allocation, is the only mandatory parameter. All other\n", - "resource parameters are optional. If none of the submitted Python function uses [mpi4py](https://mpi4py.readthedocs.io)\n", - "or any GPU, then the resources can be defined on the **Executor level** as: `cores_per_worker=1`, `threads_per_core=1` \n", - "and `gpus_per_worker=0`. These are defaults, so they do even have to be specified. In this case it also makes sense to \n", - "enable `block_allocation=True` to continuously use a fixed number of python processes rather than creating a new python\n", - "process for each submission. In this case the above example can be reduced to: " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "efe054c93d835e4a", - "metadata": {}, - "outputs": [], - "source": [ - "from executorlib import Executor\n", - "\n", - "\n", - "def calc_function(parameter_a, parameter_b):\n", - " return parameter_a + parameter_b\n", - "\n", - "\n", - "with Executor(\n", - " # Resource definition on the executor level\n", - " max_cores=2, # total number of cores available to the Executor\n", - " block_allocation=True, # reuse python processes\n", - " backend=\"flux_allocation\",\n", - ") as exe:\n", - " future_obj = exe.submit(\n", - " calc_function,\n", - " 1, # parameter_a\n", - " parameter_b=2,\n", - " )\n", - " print(future_obj.result())" - ] - }, - { - "cell_type": "markdown", - "id": "c6983f28b18f831b", - "metadata": {}, - "source": [ - "The working directory parameter `cwd` can be helpful for tasks which interact with the file system to define which task\n", - "is executed in which folder, but for most python functions it is not required." - ] - }, - { - "cell_type": "markdown", - "id": "3bf7af3ce2388f75", - "metadata": {}, - "source": [ - "## Data Handling\n", - "A limitation of many parallel approaches is the overhead in communication when working with large datasets. Instead of\n", - "reading the same dataset repetitively, the `executorlib.Executor` in block allocation mode (`block_allocation=True`) loads the dataset only once per worker and afterwards\n", - "each function submitted to this worker has access to the dataset, as it is already loaded in memory. To achieve this\n", - "the user defines an initialization function `init_function` which returns a dictionary with one key per dataset. The \n", - "keys of the dictionary can then be used as additional input parameters in each function submitted to the `executorlib.Executor`. When block allocation is disabled this functionality is not available, as each function is executed in a separate process, so no data can be preloaded.\n", - "\n", - "This functionality is illustrated below: " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "74552573e3e3d3d9", - "metadata": {}, - "outputs": [], - "source": [ - "from executorlib import Executor\n", - "\n", - "\n", - "def calc(i, j, k):\n", - " return i + j + k\n", - "\n", - "\n", - "def init_function():\n", - " return {\"j\": 4, \"k\": 3, \"l\": 2}\n", - "\n", - "\n", - "with Executor(\n", - " max_cores=1,\n", - " init_function=init_function,\n", - " backend=\"flux_allocation\",\n", - " block_allocation=True,\n", - ") as exe:\n", - " fs = exe.submit(calc, 2, j=5)\n", - " print(fs.result())" - ] - }, - { - "cell_type": "markdown", - "id": "c71bc876a65349cf", - "metadata": {}, - "source": [ - "The function `calc()` requires three inputs `i`, `j` and `k`. But when the function is submitted to the executor only \n", - "two inputs are provided `fs = exe.submit(calc, 2, j=5)`. In this case the first input parameter is mapped to `i=2`, the\n", - "second input parameter is specified explicitly `j=5` but the third input parameter `k` is not provided. So the \n", - "`executorlib.Executor` automatically checks the keys set in the `init_function()` function. In this case the returned\n", - "dictionary `{\"j\": 4, \"k\": 3, \"l\": 2}` defines `j=4`, `k=3` and `l=2`. For this specific call of the `calc()` function,\n", - "`i` and `j` are already provided so `j` is not required, but `k=3` is used from the `init_function()` and as the `calc()`\n", - "function does not define the `l` parameter this one is also ignored. \n", - "\n", - "The result is `2+5+3=10` as `i=2` and `j=5` are provided during the submission and `k=3` is defined in the `init_function()`\n", - "function." - ] - }, - { - "cell_type": "markdown", - "id": "a4d4d5447e68a834", - "metadata": {}, - "source": [ - "## Up-Scaling \n", - "[flux](https://flux-framework.org) provides fine-grained resource assigment via `libhwloc` and `pmi`." - ] - }, - { - "cell_type": "markdown", - "id": "ad6fec651dfbc263", - "metadata": {}, - "source": [ - "### Thread-based Parallelism\n", - "The number of threads per core can be controlled with the `threads_per_core` parameter during the initialization of the \n", - "`executorlib.Executor`. Unfortunately, there is no uniform way to control the number of cores a given underlying library\n", - "uses for thread based parallelism, so it might be necessary to set certain environment variables manually: \n", - "\n", - "* `OMP_NUM_THREADS`: for openmp\n", - "* `OPENBLAS_NUM_THREADS`: for openblas\n", - "* `MKL_NUM_THREADS`: for mkl\n", - "* `VECLIB_MAXIMUM_THREADS`: for accelerate on Mac Os X\n", - "* `NUMEXPR_NUM_THREADS`: for numexpr\n", - "\n", - "At the current stage `executorlib.Executor` does not set these parameters itself, so you have to add them in the function\n", - "you submit before importing the corresponding library: \n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "1fbcc6242f13973b", - "metadata": {}, - "outputs": [], - "source": [ - "def calc(i):\n", - " import os\n", - "\n", - " os.environ[\"OMP_NUM_THREADS\"] = \"2\"\n", - " os.environ[\"OPENBLAS_NUM_THREADS\"] = \"2\"\n", - " os.environ[\"MKL_NUM_THREADS\"] = \"2\"\n", - " os.environ[\"VECLIB_MAXIMUM_THREADS\"] = \"2\"\n", - " os.environ[\"NUMEXPR_NUM_THREADS\"] = \"2\"\n", - " import numpy as np\n", - "\n", - " return i" - ] - }, - { - "cell_type": "markdown", - "id": "aadd8aa9902d854e", - "metadata": {}, - "source": [ - "Most modern CPUs use hyper-threading to present the operating system with double the number of virtual cores compared to\n", - "the number of physical cores available. So unless this functionality is disabled `threads_per_core=2` is a reasonable \n", - "default. Just be careful if the number of threads is not specified it is possible that all workers try to access all \n", - "cores at the same time which can lead to poor performance. So it is typically a good idea to monitor the CPU utilization\n", - "with increasing number of workers. \n", - "\n", - "Specific manycore CPU models like the Intel Xeon Phi processors provide a much higher hyper-threading ration and require\n", - "a higher number of threads per core for optimal performance. \n" - ] - }, - { - "cell_type": "markdown", - "id": "d19861a257e40fc3", - "metadata": {}, - "source": [ - "### MPI Parallel Python Functions\n", - "Beyond thread based parallelism, the message passing interface (MPI) is the de facto standard parallel execution in \n", - "scientific computing and the [`mpi4py`](https://mpi4py.readthedocs.io) bindings to the MPI libraries are commonly used\n", - "to parallelize existing workflows. The limitation of this approach is that it requires the whole code to adopt the MPI\n", - "communication standards to coordinate the way how information is distributed. Just like the `executorlib.Executor` the\n", - "[`mpi4py.futures.MPIPoolExecutor`](https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html#mpipoolexecutor) \n", - "implements the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\n", - "interface. Still in this case eah python function submitted to the executor is still limited to serial execution. The\n", - "novel approach of the `executorlib.Executor` is mixing these two types of parallelism. Individual functions can use\n", - "the [`mpi4py`](https://mpi4py.readthedocs.io) library to handle the parallel execution within the context of this \n", - "function while these functions can still me submitted to the `executorlib.Executor` just like any other function. The\n", - "advantage of this approach is that the users can parallelize their workflows one function at the time. \n", - "\n", - "The example in `test_mpi.py` illustrates the submission of a simple MPI parallel python function: " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "e00d8448d882dfd5", - "metadata": {}, - "outputs": [], - "source": [ - "from executorlib import Executor\n", - "\n", - "\n", - "def calc(i):\n", - " from mpi4py import MPI\n", - "\n", - " size = MPI.COMM_WORLD.Get_size()\n", - " rank = MPI.COMM_WORLD.Get_rank()\n", - " return i, size, rank\n", - "\n", - "\n", - "with Executor(\n", - " max_cores=2,\n", - " resource_dict={\"cores\": 2},\n", - " backend=\"flux_allocation\",\n", - " flux_executor_pmi_mode=\"pmix\",\n", - ") as exe:\n", - " fs = exe.submit(calc, 3)\n", - " print(fs.result())" - ] - }, - { - "cell_type": "markdown", - "id": "35c49013c2de3907", - "metadata": {}, - "source": [ - "In the example environment OpenMPI version 5 is used, so the `pmi` parameter has to be set to `pmix` rather than `pmi1` or `pmi2` which is the default. For `mpich` it is not necessary to specify the `pmi` interface manually.\n", - "The `calc()` function initializes the [`mpi4py`](https://mpi4py.readthedocs.io) library and gathers the size of the \n", - "allocation and the rank of the current process within the MPI allocation. This function is then submitted to an \n", - "`executorlib.Executor` which is initialized with a single worker with two cores `cores_per_worker=2`. So each function\n", - "call is going to have access to two cores. \n", - "\n", - "Just like before the script can be called with any python interpreter even though it is using the [`mpi4py`](https://mpi4py.readthedocs.io)\n", - "library in the background it is not necessary to execute the script with `mpiexec` or `mpirun`.\n", - "\n", - "The response consists of a list of two tuples, one for each MPI parallel process, with the first entry of the tuple \n", - "being the parameter `i=3`, followed by the number of MPI parallel processes assigned to the function call `cores_per_worker=2`\n", - "and finally the index of the specific process `0` or `1`. " - ] - }, - { - "cell_type": "markdown", - "id": "6960ccc01268e1f7", - "metadata": {}, - "source": [ - "### GPU Assignment\n", - "With the rise of machine learning applications, the use of GPUs for scientific application becomes more and more popular.\n", - "Consequently, it is essential to have full control over the assignment of GPUs to specific python functions. In the \n", - "`test_gpu.py` example the `tensorflow` library is used to identify the GPUs and return their configuration: " - ] - }, - { - "cell_type": "markdown", - "id": "db3727c5da7072cd", - "metadata": {}, - "source": [ - "```\n", - "import socket\n", - "from executorlib import Executor\n", - "from tensorflow.python.client import device_lib\n", - "\n", - "def get_available_gpus():\n", - " local_device_protos = device_lib.list_local_devices()\n", - " return [\n", - " (x.name, x.physical_device_desc, socket.gethostname()) \n", - " for x in local_device_protos if x.device_type == 'GPU'\n", - " ]\n", - "\n", - "with Executor(\n", - " max_workers=2, \n", - " gpus_per_worker=1,\n", - " backend=\"flux_allocation\",\n", - ") as exe:\n", - " fs_1 = exe.submit(get_available_gpus)\n", - " fs_2 = exe.submit(get_available_gpus)\n", - " print(fs_1.result(), fs_2.result())\n", - "```" - ] - }, - { - "cell_type": "markdown", - "id": "e7ccb6c390b33c73", - "metadata": {}, - "source": [ - "The additional parameter `gpus_per_worker=1` specifies that one GPU is assigned to each worker. This functionality \n", - "requires `executorlib` to be connected to a resource manager like the [SLURM workload manager](https://www.schedmd.com)\n", - "or preferably the [flux framework](https://flux-framework.org). The rest of the script follows the previous examples, \n", - "as two functions are submitted and the results are printed. \n", - "\n", - "To clarify the execution of such an example on a high performance computing (HPC) cluster using the [SLURM workload manager](https://www.schedmd.com)\n", - "the submission script is given below: " - ] - }, - { - "cell_type": "markdown", - "id": "8aa7df69d42b5b74", - "metadata": {}, - "source": [ - "```\n", - "#!/bin/bash\n", - "#SBATCH --nodes=2\n", - "#SBATCH --gpus-per-node=1\n", - "#SBATCH --get-user-env=L\n", - "\n", - "python test_gpu.py\n", - "```" - ] - }, - { - "cell_type": "markdown", - "id": "8a6636284ba16750", - "metadata": {}, - "source": [ - "The important part is that for using the `executorlib.slurm.PySlurmExecutor` backend the script `test_gpu.py` does not\n", - "need to be executed with `srun` but rather it is sufficient to just execute it with the python interpreter. `executorlib`\n", - "internally calls `srun` to assign the individual resources to a given worker. \n", - "\n", - "For the more complex setup of running the [flux framework](https://flux-framework.org) as a secondary resource scheduler\n", - "within the [SLURM workload manager](https://www.schedmd.com) it is essential that the resources are passed from the \n", - "[SLURM workload manager](https://www.schedmd.com) to the [flux framework](https://flux-framework.org). This is achieved\n", - "by calling `srun flux start` in the submission script: " - ] - }, - { - "cell_type": "markdown", - "id": "888454c1532ad432", - "metadata": {}, - "source": [ - "```\n", - "#!/bin/bash\n", - "#SBATCH --nodes=2\n", - "#SBATCH --gpus-per-node=1\n", - "#SBATCH --get-user-env=L\n", - "\n", - "srun flux start python test_gpu.py\n", - "````" - ] - }, - { - "cell_type": "markdown", - "id": "d1285038563eee32", - "metadata": {}, - "source": [ - "As a result the GPUs available on the two compute nodes are reported: \n", - "```\n", - ">>> [('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn138'),\n", - ">>> ('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn139')]\n", - "```\n", - "In this case each compute node `cn138` and `cn139` is equipped with one `Tesla V100S-PCIE-32GB`.\n" - ] - }, - { - "cell_type": "markdown", - "id": "df3ff4f3c9ee10b8", - "metadata": {}, - "source": [ - "## Coupled Functions \n", - "For submitting two functions with rather different computing resource requirements it is essential to represent this \n", - "dependence during the submission process. In `executorlib` this can be achieved by leveraging the separate submission of\n", - "individual python functions and including the `concurrent.futures.Future` object of the first submitted function as \n", - "input for the second function during the submission. Consequently, this functionality can be used for directed acyclic \n", - "graphs, still it does not enable cyclic graphs. As a simple example we can add one to the result of the addition of one\n", - "and two:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "1dbc77aadc5b6ed0", - "metadata": {}, - "outputs": [], - "source": [ - "from executorlib import Executor\n", - "\n", - "\n", - "def calc_function(parameter_a, parameter_b):\n", - " return parameter_a + parameter_b\n", - "\n", - "\n", - "with Executor(max_cores=2, backend=\"flux_allocation\") as exe:\n", - " future_1 = exe.submit(\n", - " calc_function,\n", - " 1,\n", - " parameter_b=2,\n", - " resource_dict={\"cores\": 1},\n", - " )\n", - " future_2 = exe.submit(\n", - " calc_function,\n", - " 1,\n", - " parameter_b=future_1,\n", - " resource_dict={\"cores\": 1},\n", - " )\n", - " print(future_2.result())" - ] - }, - { - "cell_type": "markdown", - "id": "bd3e6eea-3a77-49ec-8fec-d88274aeeda5", - "metadata": {}, - "source": "Here the first addition `1+2` is computed and the output `3` is returned as the result of `future_1.result()`. Still \nbefore the computation of this addition is completed already the next addition is submitted which uses the future object\nas an input `future_1` and adds `1`. The result of both additions is `4` as `1+2+1=4`. \n\nTo disable this functionality the parameter `disable_dependencies=True` can be set on the executor level. Still at the\ncurrent stage the performance improvement of disabling this functionality seem to be minimal. Furthermore, this \nfunctionality introduces the `refresh_rate=0.01` parameter, it defines the refresh rate in seconds how frequently the \nqueue of submitted functions is queried. Typically, there is no need to change these default parameters. " - }, - { - "cell_type": "markdown", - "id": "d1086337-5291-4e06-96d1-a6e162d28c58", - "metadata": {}, - "source": [ - "## SLURM Job Scheduler\n", - "Using `executorlib` without the [flux framework](https://flux-framework.org) results in one `srun` call per worker in\n", - "`block_allocation=True` mode and one `srun` call per submitted function in `block_allocation=False` mode. As each `srun`\n", - "call represents a request to the central database of SLURM this can drastically reduce the performance, especially for\n", - "large numbers of small python functions. That is why the hierarchical job scheduler [flux framework](https://flux-framework.org)\n", - "is recommended as secondary job scheduler even within the context of the SLURM job manager. \n", - "\n", - "Still the general usage of `executorlib` remains similar even with SLURM as backend:" - ] - }, - { - "cell_type": "markdown", - "id": "27569937-7d99-4697-b3ee-f68c43b95a10", - "metadata": {}, - "source": [ - "```\n", - "from executorlib import Executor\n", - "\n", - "with Executor(max_cores=1, backend=\"slurm_allocation\") as exe:\n", - " future = exe.submit(sum, [1,1])\n", - " print(future.result())\n", - "```" - ] - }, - { - "cell_type": "markdown", - "id": "ae8dd860-f90f-47b4-b3e5-664f5c949350", - "metadata": {}, - "source": [ - "The `backend=\"slurm_allocation\"` parameter is optional as `executorlib` automatically recognizes if [flux framework](https://flux-framework.org)\n", - "or SLURM are available. \n", - "\n", - "In addition, the SLURM backend introduces the `command_line_argument_lst=[]` parameter, which allows the user to provide\n", - "a list of command line arguments for the `srun` command. " - ] - }, - { - "cell_type": "markdown", - "id": "449d2c7a-67ba-449e-8e0b-98a228707e1c", - "metadata": {}, - "source": [ - "## Workstation Support\n", - "While the high performance computing (HPC) setup is limited to the Linux operating system, `executorlib` can also be used\n", - "in combination with MacOS and Windows. These setups are limited to a single compute node. \n", - "\n", - "Still the general usage of `executorlib` remains similar:" - ] - }, - { - "cell_type": "code", - "execution_count": 11, - "id": "fa147b3b-61df-4884-b90c-544362bc95d9", - "metadata": { - "trusted": true - }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": "2\n" - } - ], - "source": [ - "from executorlib import Executor\n", - "\n", - "with Executor(max_cores=1, backend=\"local\") as exe:\n", - " future = exe.submit(sum, [1, 1], resource_dict={\"cores\": 1})\n", - " print(future.result())" - ] - }, - { - "cell_type": "markdown", - "id": "0370b42d-237b-4169-862a-b0bac4bb858b", - "metadata": {}, - "source": [ - "The `backend=\"local\"` parameter is optional as `executorlib` automatically recognizes if [flux framework](https://flux-framework.org)\n", - "or SLURM are available. \n", - "\n", - "Workstations, especially workstations with MacOs can have rather strict firewall settings. This includes limiting the\n", - "look up of hostnames and communicating with itself via their own hostname. To directly connect to `localhost` rather\n", - "than using the hostname which is the default for distributed systems, the `hostname_localhost=True` parameter is \n", - "introduced." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "601852447d3839c4", - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Flux", - "language": "python", - "name": "flux" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.12.3" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/pyproject.toml b/pyproject.toml index ddbdc55e..a312b9b5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,15 +36,27 @@ Documentation = "https://executorlib.readthedocs.io" Repository = "https://github.com/pyiron/executorlib" [project.optional-dependencies] -mpi = ["mpi4py==4.0.1"] -hdf = ["h5py==3.12.1"] +cache = ["h5py==3.12.1"] graph = [ "pygraphviz==1.14", "matplotlib==3.9.2", "networkx==3.4.2", "ipython==8.29.0", ] -queue = ["pysqa==0.2.2"] +mpi = ["mpi4py==4.0.1"] +submission = [ + "pysqa==0.2.2", + "h5py==3.12.1", +] +all = [ + "mpi4py==4.0.1", + "pysqa==0.2.2", + "h5py==3.12.1", + "pygraphviz==1.14", + "matplotlib==3.9.2", + "networkx==3.4.2", + "ipython==8.29.0", +] [tool.setuptools.packages.find] include = ["executorlib*"]