diff --git a/notebooks/2-hpc-cluster.ipynb b/notebooks/2-hpc-cluster.ipynb index 8bc85182..f5ede326 100644 --- a/notebooks/2-hpc-cluster.ipynb +++ b/notebooks/2-hpc-cluster.ipynb @@ -1,108 +1,147 @@ { - "metadata": { - "kernelspec": { - "display_name": "Flux", - "language": "python", - "name": "flux" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.12.9" - } - }, - "nbformat_minor": 5, - "nbformat": 4, "cells": [ { - "id": "ddf66f38-dc4a-4306-8b1c-b923fdb76922", "cell_type": "markdown", - "source": "# HPC Cluster Executor\nIn contrast to the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) and the [HPC Job Executor](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) the HPC Submission Executors do not communicate via the [zero message queue](https://zeromq.org) but instead store the python functions on the file system and uses the job scheduler to handle the dependencies of the Python functions. Consequently, the block allocation `block_allocation` and the init function `init_function` are not available in the HPC Cluster Executors. At the same time it is possible to close the Python process which created the `Executor`, wait until the execution of the submitted Python functions is completed and afterwards reload the results from the cache.\n\nInternally the HPC submission mode is using the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io) to connect to HPC job schedulers and the [h5py](https://www.h5py.org) package for serializing the Python functions to store them on the file system. Both packages are optional dependency of executorlib. The installation of the [pysqa](https://pysqa.readthedocs.io) package and the [h5py](https://www.h5py.org) package are covered in the installation section. ", - "metadata": {} + "id": "ddf66f38-dc4a-4306-8b1c-b923fdb76922", + "metadata": {}, + "source": [ + "# HPC Cluster Executor\n", + "In contrast to the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) and the [HPC Job Executor](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) the HPC Submission Executors do not communicate via the [zero message queue](https://zeromq.org) but instead store the python functions on the file system and uses the job scheduler to handle the dependencies of the Python functions. Consequently, the block allocation `block_allocation` and the init function `init_function` are not available in the HPC Cluster Executors. At the same time it is possible to close the Python process which created the `Executor`, wait until the execution of the submitted Python functions is completed and afterwards reload the results from the cache.\n", + "\n", + "Internally the HPC submission mode is using the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io) to connect to HPC job schedulers and the [h5py](https://www.h5py.org) package for serializing the Python functions to store them on the file system. Both packages are optional dependency of executorlib. The installation of the [pysqa](https://pysqa.readthedocs.io) package and the [h5py](https://www.h5py.org) package are covered in the installation section. " + ] }, { - "id": "d56862a6-8279-421d-a090-7ca2a3c4d416", "cell_type": "markdown", - "source": "## SLURM\nThe [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com) job scheduler is currently the most commonly used job scheduler for HPC clusters. In the HPC submission mode executorlib internally uses the [sbatch](https://slurm.schedmd.com/sbatch.html) command this is in contrast to the [HPC allocatiom mode] which internally uses the [srun](https://slurm.schedmd.com/srun.html) command. \n\nThe connection to the job scheduler is based on the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io). It provides a default configuration for most commonly used job schedulers including SLURM, in addition it is also possible to provide the submission template as part of the resource dictionary `resource_dict` or via the path to the configuration directory with the `pysqa_config_directory` parameter. All three options are covered in more detail on the [pysqa documentation](https://pysqa.readthedocs.io).", - "metadata": {} + "id": "d56862a6-8279-421d-a090-7ca2a3c4d416", + "metadata": {}, + "source": [ + "## SLURM\n", + "The [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com) job scheduler is currently the most commonly used job scheduler for HPC clusters. In the HPC submission mode executorlib internally uses the [sbatch](https://slurm.schedmd.com/sbatch.html) command this is in contrast to the [HPC allocatiom mode] which internally uses the [srun](https://slurm.schedmd.com/srun.html) command. \n", + "\n", + "The connection to the job scheduler is based on the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io). It provides a default configuration for most commonly used job schedulers including SLURM, in addition it is also possible to provide the submission template as part of the resource dictionary `resource_dict` or via the path to the configuration directory with the `pysqa_config_directory` parameter. All three options are covered in more detail on the [pysqa documentation](https://pysqa.readthedocs.io)." + ] }, { - "id": "db7760e8-35a6-4a1c-8b0f-410b536c3835", "cell_type": "markdown", - "source": "```python\nfrom executorlib import SlurmClusterExecutor\n```", - "metadata": {} + "id": "db7760e8-35a6-4a1c-8b0f-410b536c3835", + "metadata": {}, + "source": [ + "```python\n", + "from executorlib import SlurmClusterExecutor\n", + "```" + ] }, { - "id": "b20913f3-59e4-418c-a399-866124f8e497", "cell_type": "markdown", - "source": "In comparison to the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html), the only parameter which is changed in the `SlurmClusterExecutor` is the requirement to specify the cache directory using the `cache_directory=\"./cache\"`. The rest of the syntax remains exactly the same, to simplify the up-scaling of simulation workflows.", - "metadata": {} + "id": "b20913f3-59e4-418c-a399-866124f8e497", + "metadata": {}, + "source": [ + "In comparison to the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html), the only parameter which is changed in the `SlurmClusterExecutor` is the requirement to specify the cache directory using the `cache_directory=\"./cache\"`. The rest of the syntax remains exactly the same, to simplify the up-scaling of simulation workflows." + ] }, { - "id": "0b8f3b77-6199-4736-9f28-3058c5230777", "cell_type": "markdown", - "source": "```python\nwith SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])\n```", - "metadata": {} + "id": "0b8f3b77-6199-4736-9f28-3058c5230777", + "metadata": {}, + "source": [ + "```python\n", + "with SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n", + " future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n", + " print([f.result() for f in future_lst])\n", + "```" + ] }, { - "id": "37bef7ac-ce3e-4d8a-b848-b1474c370bca", "cell_type": "markdown", - "source": "Specific parameters for `SlurmClusterExecutor` like the maximum run time `\"run_time_max\"`, the maximum memory `\"memory_max\"` or the submission template for the job submission script `\"submission_template\"` can be specified as part of the resource dictionary. Again it is possible to specify the resource dictonary `resource_dicionary` either for each function in the `submit()` function or during the initialization of the `SlurmClusterExecutor`.", - "metadata": {} + "id": "37bef7ac-ce3e-4d8a-b848-b1474c370bca", + "metadata": {}, + "source": [ + "Specific parameters for `SlurmClusterExecutor` like the maximum run time `\"run_time_max\"`, the maximum memory `\"memory_max\"` or the submission template for the job submission script `\"submission_template\"` can be specified as part of the resource dictionary. Again it is possible to specify the resource dictonary `resource_dicionary` either for each function in the `submit()` function or during the initialization of the `SlurmClusterExecutor`." + ] }, { - "id": "658781de-f222-4235-8c26-b0f77a0831b3", "cell_type": "markdown", - "source": "```python\nsubmission_template = \"\"\"\\\n#!/bin/bash\n#SBATCH --output=time.out\n#SBATCH --job-name={{job_name}}\n#SBATCH --chdir={{working_directory}}\n#SBATCH --get-user-env=L\n#SBATCH --partition={{partition}}\n{%- if run_time_max %}\n#SBATCH --time={{ [1, run_time_max // 60]|max }}\n{%- endif %}\n{%- if dependency %}\n#SBATCH --dependency=afterok:{{ dependency | join(',') }}\n{%- endif %}\n{%- if memory_max %}\n#SBATCH --mem={{memory_max}}G\n{%- endif %}\n#SBATCH --cpus-per-task={{cores}}\n\n{{command}}\n\"\"\"\n\nwith SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n future = exe.submit(\n sum, [4, 4], \n resource_dict={\n \"submission_template\": submission_template, \n \"run_time_max\": 180, # in seconds \n })\n print(future.result())\n```", - "metadata": {} + "id": "658781de-f222-4235-8c26-b0f77a0831b3", + "metadata": {}, + "source": [ + "```python\n", + "submission_template = \"\"\"\\\n", + "#!/bin/bash\n", + "#SBATCH --output=time.out\n", + "#SBATCH --job-name={{job_name}}\n", + "#SBATCH --chdir={{working_directory}}\n", + "#SBATCH --get-user-env=L\n", + "#SBATCH --partition={{partition}}\n", + "{%- if run_time_max %}\n", + "#SBATCH --time={{ [1, run_time_max // 60]|max }}\n", + "{%- endif %}\n", + "{%- if dependency %}\n", + "#SBATCH --dependency=afterok:{{ dependency | join(',') }}\n", + "{%- endif %}\n", + "{%- if memory_max %}\n", + "#SBATCH --mem={{memory_max}}G\n", + "{%- endif %}\n", + "#SBATCH --cpus-per-task={{cores}}\n", + "\n", + "{{command}}\n", + "\"\"\"\n", + "\n", + "with SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n", + " future = exe.submit(\n", + " sum, [4, 4], \n", + " resource_dict={\n", + " \"submission_template\": submission_template, \n", + " \"run_time_max\": 180, # in seconds \n", + " \"partition\": \"s.cmfe\",\n", + " })\n", + " print(future.result())\n", + "```" + ] }, { - "id": "f7ad9c97-7743-4f87-9344-4299b2b31a56", "cell_type": "markdown", - "source": "With these options executorlib in combination with the SLURM job scheduler provides a lot flexibility to configure the submission of Python functions depending on the specific configuration of the job scheduler. ", - "metadata": {} + "id": "f7ad9c97-7743-4f87-9344-4299b2b31a56", + "metadata": {}, + "source": [ + "With these options executorlib in combination with the SLURM job scheduler provides a lot flexibility to configure the submission of Python functions depending on the specific configuration of the job scheduler. " + ] }, { - "id": "2a814efb-2fbc-41ba-98df-cf121d19ea66", "cell_type": "markdown", - "source": "## Flux\nWhile most HPC job schedulers require extensive configuration before they can be tested, the [flux framework](http://flux-framework.org) can be installed with the conda package manager, as explained in the [installation section](https://executorlib.readthedocs.io/en/latest/installation.html#alternative-installations). This simple installation makes the flux framework especially suitable for demonstrations, testing and continous integration. So below a number of features for the HPC submission mode are demonstrated based on the example of the [flux framework](http://flux-framework.org) still the same applies to other job schedulers like SLURM introduced above.", - "metadata": {} + "id": "2a814efb-2fbc-41ba-98df-cf121d19ea66", + "metadata": {}, + "source": [ + "## Flux\n", + "While most HPC job schedulers require extensive configuration before they can be tested, the [flux framework](http://flux-framework.org) can be installed with the conda package manager, as explained in the [installation section](https://executorlib.readthedocs.io/en/latest/installation.html#alternative-installations). This simple installation makes the flux framework especially suitable for demonstrations, testing and continous integration. So below a number of features for the HPC submission mode are demonstrated based on the example of the [flux framework](http://flux-framework.org) still the same applies to other job schedulers like SLURM introduced above." + ] }, { - "id": "29d7aa18-357e-416e-805c-1322b59abec1", "cell_type": "markdown", - "source": "### Dependencies\nAs already demonstrated for the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the `Executor` classes from executorlib are capable of resolving the dependencies of serial functions, when [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects are used as inputs for subsequent function calls. For the case of the HPC submission these dependencies are communicated to the job scheduler, which allows to stop the Python process which created the `Executor` class, wait until the execution of the submitted Python functions is completed and afterwards restart the Python process for the `Executor` class and reload the calculation results from the cache defined by the `cache_directory` parameter.", - "metadata": {} + "id": "29d7aa18-357e-416e-805c-1322b59abec1", + "metadata": {}, + "source": [ + "### Dependencies\n", + "As already demonstrated for the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the `Executor` classes from executorlib are capable of resolving the dependencies of serial functions, when [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects are used as inputs for subsequent function calls. For the case of the HPC submission these dependencies are communicated to the job scheduler, which allows to stop the Python process which created the `Executor` class, wait until the execution of the submitted Python functions is completed and afterwards restart the Python process for the `Executor` class and reload the calculation results from the cache defined by the `cache_directory` parameter." + ] }, { - "id": "0f7fc37a-1248-492d-91ab-9db1d737eaee", "cell_type": "code", - "source": "def add_funct(a, b):\n return a + b", + "execution_count": 1, + "id": "0f7fc37a-1248-492d-91ab-9db1d737eaee", "metadata": { "trusted": false }, "outputs": [], - "execution_count": 1 + "source": [ + "def add_funct(a, b):\n", + " return a + b" + ] }, { - "id": "ae308683-6083-4e78-afc2-bff6c6dc297b", "cell_type": "code", - "source": [ - "from executorlib import FluxClusterExecutor\n", - "\n", - "with FluxClusterExecutor(cache_directory=\"./file\") as exe:\n", - " future = 0\n", - " for i in range(4, 8):\n", - " future = exe.submit(add_funct, i, future)\n", - " print(future.result())" - ], + "execution_count": 2, + "id": "ae308683-6083-4e78-afc2-bff6c6dc297b", "metadata": { "trusted": false }, @@ -110,35 +149,53 @@ { "name": "stdout", "output_type": "stream", - "text": "22\n" + "text": [ + "22\n" + ] } ], - "execution_count": 2 + "source": [ + "from executorlib import FluxClusterExecutor\n", + "\n", + "with FluxClusterExecutor(cache_directory=\"./file\") as exe:\n", + " future = 0\n", + " for i in range(4, 8):\n", + " future = exe.submit(add_funct, i, future)\n", + " print(future.result())" + ] }, { - "id": "ca75cb6c-c50f-4bee-9b09-d8d29d6c263b", "cell_type": "markdown", - "source": "### Resource Assignment\nIn analogy to the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the resource assignment for the `FluxClusterExecutor` is handled by either including the resource dictionary parameter `resource_dict` in the initialization of the `FluxClusterExecutor` class or in every call of the `submit()` function.\n\nBelow this is demonstrated once for the assignment of multiple CPU cores for the execution of a Python function which internally uses the message passing interface (MPI) via the [mpi4py](https://mpi4py.readthedocs.io) package.", - "metadata": {} + "id": "ca75cb6c-c50f-4bee-9b09-d8d29d6c263b", + "metadata": {}, + "source": [ + "### Resource Assignment\n", + "In analogy to the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the resource assignment for the `FluxClusterExecutor` is handled by either including the resource dictionary parameter `resource_dict` in the initialization of the `FluxClusterExecutor` class or in every call of the `submit()` function.\n", + "\n", + "Below this is demonstrated once for the assignment of multiple CPU cores for the execution of a Python function which internally uses the message passing interface (MPI) via the [mpi4py](https://mpi4py.readthedocs.io) package." + ] }, { - "id": "eded3a0f-e54f-44f6-962f-eedde4bd2158", "cell_type": "code", - "source": "def calc(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank\n", + "execution_count": 3, + "id": "eded3a0f-e54f-44f6-962f-eedde4bd2158", "metadata": { "trusted": false }, "outputs": [], - "execution_count": 3 + "source": [ + "def calc(i):\n", + " from mpi4py import MPI\n", + "\n", + " size = MPI.COMM_WORLD.Get_size()\n", + " rank = MPI.COMM_WORLD.Get_rank()\n", + " return i, size, rank\n" + ] }, { - "id": "669b05df-3cb2-4f69-9d94-8b2442745ebb", "cell_type": "code", - "source": [ - "with FluxClusterExecutor(cache_directory=\"./file\") as exe:\n", - " fs = exe.submit(calc, 3, resource_dict={\"cores\": 2})\n", - " print(fs.result())" - ], + "execution_count": 4, + "id": "669b05df-3cb2-4f69-9d94-8b2442745ebb", "metadata": { "trusted": false }, @@ -146,26 +203,70 @@ { "name": "stdout", "output_type": "stream", - "text": "[(3, 2, 0), (3, 2, 1)]\n" + "text": [ + "[(3, 2, 0), (3, 2, 1)]\n" + ] } ], - "execution_count": 4 + "source": [ + "with FluxClusterExecutor(cache_directory=\"./file\") as exe:\n", + " fs = exe.submit(calc, 3, resource_dict={\"cores\": 2})\n", + " print(fs.result())" + ] }, { - "id": "d91499d7-5c6c-4c10-b7b7-bfc4b87ddaa8", "cell_type": "markdown", - "source": "Beyond CPU cores and threads which were previously also introduced for the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the HPC Cluster Executors also provide the option to select the available accelerator cards or GPUs, by specifying the `\"gpus_per_core\"` parameter in the resource dictionary `resource_dict`. For demonstration we create a Python function which reads the GPU device IDs and submit it to the `FluxClusterExecutor` class:\n```python\ndef get_available_gpus():\n import socket\n from tensorflow.python.client import device_lib\n local_device_protos = device_lib.list_local_devices()\n return [\n (x.name, x.physical_device_desc, socket.gethostname()) \n for x in local_device_protos if x.device_type == 'GPU'\n ]\n```\n\n```python\nwith FluxClusterExecutor(\n cache_directory=\"./cache\",\n resource_dict={\"gpus_per_core\": 1}\n) as exe:\n fs_1 = exe.submit(get_available_gpus)\n fs_2 = exe.submit(get_available_gpus)\n print(fs_1.result(), fs_2.result())\n```", - "metadata": {} + "id": "d91499d7-5c6c-4c10-b7b7-bfc4b87ddaa8", + "metadata": {}, + "source": [ + "Beyond CPU cores and threads which were previously also introduced for the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the HPC Cluster Executors also provide the option to select the available accelerator cards or GPUs, by specifying the `\"gpus_per_core\"` parameter in the resource dictionary `resource_dict`. For demonstration we create a Python function which reads the GPU device IDs and submit it to the `FluxClusterExecutor` class:\n", + "```python\n", + "def get_available_gpus():\n", + " import socket\n", + " from tensorflow.python.client import device_lib\n", + " local_device_protos = device_lib.list_local_devices()\n", + " return [\n", + " (x.name, x.physical_device_desc, socket.gethostname()) \n", + " for x in local_device_protos if x.device_type == 'GPU'\n", + " ]\n", + "```\n", + "\n", + "```python\n", + "with FluxClusterExecutor(\n", + " cache_directory=\"./cache\",\n", + " resource_dict={\"gpus_per_core\": 1}\n", + ") as exe:\n", + " fs_1 = exe.submit(get_available_gpus)\n", + " fs_2 = exe.submit(get_available_gpus)\n", + " print(fs_1.result(), fs_2.result())\n", + "```" + ] }, { - "id": "3f47fd34-04d1-42a7-bb06-6821dc99a648", "cell_type": "markdown", - "source": "### Cleaning Cache\nFinally, as the HPC Cluster Executors leverage the file system to communicate serialized Python functions, it is important to clean up the cache directory specified by the `cache_directory` parameter once the results of the submitted Python functions are no longer needed. The serialized Python functions are stored in binary format using the [cloudpickle](https://github.com/cloudpipe/cloudpickle) library for serialization. This format is design for caching but not for long-term storage. The user is responsible for the long-term storage of their data.", - "metadata": {} + "id": "3f47fd34-04d1-42a7-bb06-6821dc99a648", + "metadata": {}, + "source": [ + "### Cleaning Cache\n", + "Finally, as the HPC Cluster Executors leverage the file system to communicate serialized Python functions, it is important to clean up the cache directory specified by the `cache_directory` parameter once the results of the submitted Python functions are no longer needed. The serialized Python functions are stored in binary format using the [cloudpickle](https://github.com/cloudpipe/cloudpickle) library for serialization. This format is design for caching but not for long-term storage. The user is responsible for the long-term storage of their data." + ] }, { - "id": "f537b4f6-cc98-43da-8aca-94a823bcbcbd", "cell_type": "code", + "execution_count": 5, + "id": "f537b4f6-cc98-43da-8aca-94a823bcbcbd", + "metadata": { + "trusted": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "['add_functdce32a0e7f6eac9e4e19fec335b79726', 'calc76234667eef65c770fecf54645ef8ada', 'add_functee0545e0d3edb8a4a6ceb6d5ae712d39', 'add_funct3263a1038c0d088677685b6eccd9f7b7', 'add_funct6034ded02bdb3ff97695f3a94455ca4d']\n" + ] + } + ], "source": [ "import os\n", "import shutil\n", @@ -177,18 +278,28 @@ " shutil.rmtree(cache_dir)\n", " except OSError:\n", " pass" - ], - "metadata": { - "trusted": false + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Flux", + "language": "python", + "name": "flux" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": "['add_functdce32a0e7f6eac9e4e19fec335b79726', 'calc76234667eef65c770fecf54645ef8ada', 'add_functee0545e0d3edb8a4a6ceb6d5ae712d39', 'add_funct3263a1038c0d088677685b6eccd9f7b7', 'add_funct6034ded02bdb3ff97695f3a94455ca4d']\n" - } - ], - "execution_count": 5 + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" } - ] + }, + "nbformat": 4, + "nbformat_minor": 5 } diff --git a/notebooks/3-hpc-job.ipynb b/notebooks/3-hpc-job.ipynb index 0c266722..21077179 100644 --- a/notebooks/3-hpc-job.ipynb +++ b/notebooks/3-hpc-job.ipynb @@ -43,7 +43,7 @@ "metadata": {}, "source": [ "```python\n", - "with SlurmAllocationExecutor() as exe:\n", + "with SlurmJobExecutor() as exe:\n", " future = exe.submit(sum, [1, 1])\n", " print(future.result())\n", "```"