diff --git a/notebooks/2-hpc-submission.ipynb b/notebooks/2-hpc-submission.ipynb index d130f52c..f2a6b0a5 100644 --- a/notebooks/2-hpc-submission.ipynb +++ b/notebooks/2-hpc-submission.ipynb @@ -1 +1,218 @@ -{"metadata":{"kernelspec":{"name":"flux","display_name":"Flux","language":"python"},"language_info":{"name":"python","version":"3.12.7","mimetype":"text/x-python","codemirror_mode":{"name":"ipython","version":3},"pygments_lexer":"ipython3","nbconvert_exporter":"python","file_extension":".py"}},"nbformat_minor":5,"nbformat":4,"cells":[{"id":"ddf66f38-dc4a-4306-8b1c-b923fdb76922","cell_type":"markdown","source":"# HPC Submission Mode\nIn contrast to the [local mode] and the [HPC allocation mode] the HPC Submission Mode does not communicate via the [zero message queue](https://zeromq.org) but instead stores the python functions on the file system and uses the job scheduler to handle the dependencies of the Python functions. Consequently, the block allocation `block_allocation` and the init function `init_function` are not available in HPC Submission mode. At the same time it is possible to close the Python process which created the `Executor`, wait until the execution of the submitted Python functions is completed and afterwards reload the results from the cache. \n\nInternally the HPC submission mode is using the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io) to connect to HPC job schedulers and the [h5py](https://www.h5py.org) package for serializing the Python functions to store them on the file system. Both packages are optional dependency of executorlib. The installation of the [pysqa](https://pysqa.readthedocs.io) package and the [h5py](https://www.h5py.org) package are covered in the installation section. ","metadata":{}},{"id":"d56862a6-8279-421d-a090-7ca2a3c4d416","cell_type":"markdown","source":"## SLURM\nThe [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com) job scheduler is currently the most commonly used job scheduler for HPC clusters. In the HPC submission mode executorlib internally uses the [sbatch](https://slurm.schedmd.com/sbatch.html) command this is in contrast to the [HPC allocatiom mode] which internally uses the [srun](https://slurm.schedmd.com/srun.html) command. \n\nThe connection to the job scheduler is based on the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io). It provides a default configuration for most commonly used job schedulers including SLURM, in addition it is also possible to provide the submission template as part of the resource dictionary `resource_dict` or via the path to the configuration directory with the `pysqa_config_directory` parameter. All three options are covered in more detail on the [pysqa documentation](https://pysqa.readthedocs.io).","metadata":{}},{"id":"83b87fb2-9f07-46ec-9646-7e273378e72f","cell_type":"code","source":"from executorlib import Executor","metadata":{"trusted":true},"outputs":[],"execution_count":1},{"id":"b20913f3-59e4-418c-a399-866124f8e497","cell_type":"markdown","source":"In comparison to the [Local Mode](), the only two parameters which are changed are the specification of the backend as `backend=\"slurm_submission\"` and the requirement to specify the cache directory using the `cache_directory=\"./cache\"`. The rest of the syntax remains exactly the same, to simplify the up-scaling of simulation workflows. ","metadata":{}},{"id":"0b8f3b77-6199-4736-9f28-3058c5230777","cell_type":"markdown","source":"```python\nwith Executor(backend=\"slurm_submission\", cache_directory=\"./cache\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])\n```","metadata":{}},{"id":"37bef7ac-ce3e-4d8a-b848-b1474c370bca","cell_type":"markdown","source":"Specific parameters for HPC submission mode like the maximum run time `\"run_time_max\"`, the maximum memory `\"memory_max\"` or the submission template for the job submission script `\"submission_template\"` can be specified as part of the resource dictionary. Again it is possible to specify the resource dictonary `resource_dicionary` either for each function in the `submit()` function or during the initialization of the `Executor`. ","metadata":{}},{"id":"658781de-f222-4235-8c26-b0f77a0831b3","cell_type":"markdown","source":"```python\nsubmission_template = \"\"\"\\\n#!/bin/bash\n#SBATCH --output=time.out\n#SBATCH --job-name={{job_name}}\n#SBATCH --chdir={{working_directory}}\n#SBATCH --get-user-env=L\n#SBATCH --partition={{partition}}\n{%- if run_time_max %}\n#SBATCH --time={{ [1, run_time_max // 60]|max }}\n{%- endif %}\n{%- if dependency %}\n#SBATCH --dependency=afterok:{{ dependency | join(',') }}\n{%- endif %}\n{%- if memory_max %}\n#SBATCH --mem={{memory_max}}G\n{%- endif %}\n#SBATCH --cpus-per-task={{cores}}\n\n{{command}}\n\"\"\"\n\nwith Executor(backend=\"slurm_submission\", cache_directory=\"./cache\") as exe:\n future = exe.submit(\n sum, [4, 4], \n resource_dict={\n \"submission_template\": submission_template, \n \"run_time_max\": 180, # in seconds \n })\n print([f.result() for f in future_lst])\n```","metadata":{}},{"id":"f7ad9c97-7743-4f87-9344-4299b2b31a56","cell_type":"markdown","source":"With these options executorlib in combination with the SLURM job scheduler provides a lot flexibility to configure the submission of Python functions depending on the specific configuration of the job scheduler. ","metadata":{}},{"id":"2a814efb-2fbc-41ba-98df-cf121d19ea66","cell_type":"markdown","source":"## Flux\nWhile most HPC job schedulers require extensive configuration before they can be tested, the [flux framework](http://flux-framework.org) can be installed with the conda package manager, as explained in the [installation section](). This simple installation makes the flux framework especially suitable for demonstrations, testing and continous integration. So below a number of features for the HPC submission mode are demonstrated based on the example of the [flux framework](http://flux-framework.org) still the same applies to other job schedulers like SLURM introduced above.","metadata":{}},{"id":"29d7aa18-357e-416e-805c-1322b59abec1","cell_type":"markdown","source":"### Dependencies\nAs already demonstrated for the [Local Mode]() the `Executor` class from executorlib is capable of resolving the dependencies of serial functions, when [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects are used as inputs for subsequent function calls. For the case of the HPC submission these dependencies are communicated to the job scheduler, which allows to stop the Python process which created the `Executor` class, wait until the execution of the submitted Python functions is completed and afterwards restart the Python process for the `Executor` class and reload the calculation results from the cache defined by the `cache_directory` parameter.","metadata":{}},{"id":"cac92da2-dcb9-4d17-8a9c-5e3c7e577357","cell_type":"code","source":"def add_funct(a, b):\n return a + b","metadata":{"trusted":true},"outputs":[],"execution_count":2},{"id":"754f480a-1592-4f32-b7a2-5d348d95954e","cell_type":"code","source":"with Executor(backend=\"flux_submission\", cache_directory=\"./cache\") as exe:\n future = 0\n for i in range(4, 8):\n future = exe.submit(add_funct, i, future)\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"22\n","output_type":"stream"}],"execution_count":3},{"id":"ca75cb6c-c50f-4bee-9b09-d8d29d6c263b","cell_type":"markdown","source":"### Resource Assignment\nIn analogy to the [Local Mode]() the resource assignment for the HPC submission mode is handled by either including the resource dictionary parameter `resource_dict` in the initialization of the `Executor` class or in every call of the `submit()` function. \n\nBelow this is demonstrated once for the assignment of muliple CPU cores for the execution of a Python function which internally uses the message passing interface (MPI) via the [mpi4py](https://mpi4py.readthedocs.io) package. ","metadata":{}},{"id":"b3f11ab4-e90b-48f3-9117-4b4b1669572e","cell_type":"code","source":"def calc(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank","metadata":{"trusted":true},"outputs":[],"execution_count":4},{"id":"826ba382-6384-4a63-a379-ec48c73f6fcd","cell_type":"code","source":"with Executor(backend=\"flux_submission\", cache_directory=\"./cache\") as exe:\n fs = exe.submit(calc, 3, resource_dict={\"cores\": 2})\n print(fs.result())\n","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"[(3, 2, 0), (3, 2, 1)]\n","output_type":"stream"}],"execution_count":5},{"id":"d91499d7-5c6c-4c10-b7b7-bfc4b87ddaa8","cell_type":"markdown","source":"Beyond CPU cores and threads which were previously also introduced for the [Local Mode]() the HPC submission mode also provides the option to select the available accelerator cards or GPUs, by specifying the `\"gpus_per_core\"` parameter in the resource dictionary `resource_dict`. For demonstration we create a Python function which reads the GPU device IDs and submit it to the `Executor` class:\n```python\ndef get_available_gpus():\n import socket\n from tensorflow.python.client import device_lib\n local_device_protos = device_lib.list_local_devices()\n return [\n (x.name, x.physical_device_desc, socket.gethostname()) \n for x in local_device_protos if x.device_type == 'GPU'\n ]\n```\n\n```python\nwith Executor(\n backend=\"flux_submission\",\n cache_directory=\"./cache\",\n resource_dict={\"gpus_per_core\": 1}\n) as exe:\n fs_1 = exe.submit(get_available_gpus)\n fs_2 = exe.submit(get_available_gpus)\n print(fs_1.result(), fs_2.result())\n```","metadata":{}},{"id":"3f47fd34-04d1-42a7-bb06-6821dc99a648","cell_type":"markdown","source":"### Cleaning Cache\nFinally, as the HPC Submission Mode leverages the file system to communicate serialized Python functions, it is important to clean up the cache directory specified by the `cache_directory` parameter once the results of the submitted Python functions are no longer needed. The serialized Python functions are stored in binary format using the [cloudpickle](https://github.com/cloudpipe/cloudpickle) library for serialization. This format is design for caching but not for long-term storage. The user is responsible for the long-term storage of their data.","metadata":{}},{"id":"66d6d566-e1ea-4f16-8cb2-cc4e6c6b42e7","cell_type":"code","source":"import os\nimport shutil\n\ncache_dir = \"./cache\"\nif os.path.exists(cache_dir):\n print(os.listdir(cache_dir))\n try:\n shutil.rmtree(cache_dir)\n except OSError:\n pass","metadata":{"trusted":true},"outputs":[{"name":"stdout","text":"['run_queue.sh', 'time.out', 'error.out', 'add_functee0545e0d3edb8a4a6ceb6d5ae712d39.h5out', 'add_functd84e6bfb7b6ad4c0d984dbd536111add.h5out', 'add_funct95858a1edeb485034ec6f9739aaa3ce3.h5out', 'add_funct843c1fde7badf8cee51567c8f2122d4f.h5out', 'calc6f05130a7b724a84f9174df8a602ab12.h5out']\n","output_type":"stream"}],"execution_count":6},{"id":"1de93586-d302-4aa6-878a-51acfb1d3009","cell_type":"code","source":"","metadata":{"trusted":true},"outputs":[],"execution_count":null}]} \ No newline at end of file +{ + "cells": [ + { + "cell_type": "markdown", + "id": "ddf66f38-dc4a-4306-8b1c-b923fdb76922", + "metadata": {}, + "source": "# HPC Submission Mode\nIn contrast to the [local mode] and the [HPC allocation mode] the HPC Submission Mode does not communicate via the [zero message queue](https://zeromq.org) but instead stores the python functions on the file system and uses the job scheduler to handle the dependencies of the Python functions. Consequently, the block allocation `block_allocation` and the init function `init_function` are not available in HPC Submission mode. At the same time it is possible to close the Python process which created the `Executor`, wait until the execution of the submitted Python functions is completed and afterwards reload the results from the cache. \n\nInternally the HPC submission mode is using the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io) to connect to HPC job schedulers and the [h5py](https://www.h5py.org) package for serializing the Python functions to store them on the file system. Both packages are optional dependency of executorlib. The installation of the [pysqa](https://pysqa.readthedocs.io) package and the [h5py](https://www.h5py.org) package are covered in the installation section. " + }, + { + "cell_type": "markdown", + "id": "d56862a6-8279-421d-a090-7ca2a3c4d416", + "metadata": {}, + "source": "## SLURM\nThe [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com) job scheduler is currently the most commonly used job scheduler for HPC clusters. In the HPC submission mode executorlib internally uses the [sbatch](https://slurm.schedmd.com/sbatch.html) command this is in contrast to the [HPC allocatiom mode] which internally uses the [srun](https://slurm.schedmd.com/srun.html) command. \n\nThe connection to the job scheduler is based on the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io). It provides a default configuration for most commonly used job schedulers including SLURM, in addition it is also possible to provide the submission template as part of the resource dictionary `resource_dict` or via the path to the configuration directory with the `pysqa_config_directory` parameter. All three options are covered in more detail on the [pysqa documentation](https://pysqa.readthedocs.io)." + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "83b87fb2-9f07-46ec-9646-7e273378e72f", + "metadata": { + "trusted": true + }, + "outputs": [], + "source": [ + "from executorlib import Executor" + ] + }, + { + "cell_type": "markdown", + "id": "b20913f3-59e4-418c-a399-866124f8e497", + "metadata": {}, + "source": "In comparison to the [Local Mode](), the only two parameters which are changed are the specification of the backend as `backend=\"slurm_submission\"` and the requirement to specify the cache directory using the `cache_directory=\"./cache\"`. The rest of the syntax remains exactly the same, to simplify the up-scaling of simulation workflows. " + }, + { + "cell_type": "markdown", + "id": "0b8f3b77-6199-4736-9f28-3058c5230777", + "metadata": {}, + "source": "```python\nwith Executor(backend=\"slurm_submission\", cache_directory=\"./cache\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])\n```" + }, + { + "cell_type": "markdown", + "id": "37bef7ac-ce3e-4d8a-b848-b1474c370bca", + "metadata": {}, + "source": "Specific parameters for HPC submission mode like the maximum run time `\"run_time_max\"`, the maximum memory `\"memory_max\"` or the submission template for the job submission script `\"submission_template\"` can be specified as part of the resource dictionary. Again it is possible to specify the resource dictonary `resource_dicionary` either for each function in the `submit()` function or during the initialization of the `Executor`. " + }, + { + "cell_type": "markdown", + "id": "658781de-f222-4235-8c26-b0f77a0831b3", + "metadata": {}, + "source": "```python\nsubmission_template = \"\"\"\\\n#!/bin/bash\n#SBATCH --output=time.out\n#SBATCH --job-name={{job_name}}\n#SBATCH --chdir={{working_directory}}\n#SBATCH --get-user-env=L\n#SBATCH --partition={{partition}}\n{%- if run_time_max %}\n#SBATCH --time={{ [1, run_time_max // 60]|max }}\n{%- endif %}\n{%- if dependency %}\n#SBATCH --dependency=afterok:{{ dependency | join(',') }}\n{%- endif %}\n{%- if memory_max %}\n#SBATCH --mem={{memory_max}}G\n{%- endif %}\n#SBATCH --cpus-per-task={{cores}}\n\n{{command}}\n\"\"\"\n\nwith Executor(backend=\"slurm_submission\", cache_directory=\"./cache\") as exe:\n future = exe.submit(\n sum, [4, 4], \n resource_dict={\n \"submission_template\": submission_template, \n \"run_time_max\": 180, # in seconds \n })\n print([f.result() for f in future_lst])\n```" + }, + { + "cell_type": "markdown", + "id": "f7ad9c97-7743-4f87-9344-4299b2b31a56", + "metadata": {}, + "source": "With these options executorlib in combination with the SLURM job scheduler provides a lot flexibility to configure the submission of Python functions depending on the specific configuration of the job scheduler. " + }, + { + "cell_type": "markdown", + "id": "2a814efb-2fbc-41ba-98df-cf121d19ea66", + "metadata": {}, + "source": "## Flux\nWhile most HPC job schedulers require extensive configuration before they can be tested, the [flux framework](http://flux-framework.org) can be installed with the conda package manager, as explained in the [installation section](). This simple installation makes the flux framework especially suitable for demonstrations, testing and continous integration. So below a number of features for the HPC submission mode are demonstrated based on the example of the [flux framework](http://flux-framework.org) still the same applies to other job schedulers like SLURM introduced above." + }, + { + "cell_type": "markdown", + "id": "29d7aa18-357e-416e-805c-1322b59abec1", + "metadata": {}, + "source": "### Dependencies\nAs already demonstrated for the [Local Mode]() the `Executor` class from executorlib is capable of resolving the dependencies of serial functions, when [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects are used as inputs for subsequent function calls. For the case of the HPC submission these dependencies are communicated to the job scheduler, which allows to stop the Python process which created the `Executor` class, wait until the execution of the submitted Python functions is completed and afterwards restart the Python process for the `Executor` class and reload the calculation results from the cache defined by the `cache_directory` parameter." + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "cac92da2-dcb9-4d17-8a9c-5e3c7e577357", + "metadata": { + "trusted": true + }, + "outputs": [], + "source": [ + "def add_funct(a, b):\n", + " return a + b" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "754f480a-1592-4f32-b7a2-5d348d95954e", + "metadata": { + "trusted": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": "22\n" + } + ], + "source": [ + "with Executor(backend=\"flux_submission\", cache_directory=\"./cache\") as exe:\n", + " future = 0\n", + " for i in range(4, 8):\n", + " future = exe.submit(add_funct, i, future)\n", + " print(future.result())" + ] + }, + { + "cell_type": "markdown", + "id": "ca75cb6c-c50f-4bee-9b09-d8d29d6c263b", + "metadata": {}, + "source": "### Resource Assignment\nIn analogy to the [Local Mode]() the resource assignment for the HPC submission mode is handled by either including the resource dictionary parameter `resource_dict` in the initialization of the `Executor` class or in every call of the `submit()` function. \n\nBelow this is demonstrated once for the assignment of muliple CPU cores for the execution of a Python function which internally uses the message passing interface (MPI) via the [mpi4py](https://mpi4py.readthedocs.io) package. " + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "b3f11ab4-e90b-48f3-9117-4b4b1669572e", + "metadata": { + "trusted": true + }, + "outputs": [], + "source": [ + "def calc(i):\n", + " from mpi4py import MPI\n", + "\n", + " size = MPI.COMM_WORLD.Get_size()\n", + " rank = MPI.COMM_WORLD.Get_rank()\n", + " return i, size, rank" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "826ba382-6384-4a63-a379-ec48c73f6fcd", + "metadata": { + "trusted": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": "[(3, 2, 0), (3, 2, 1)]\n" + } + ], + "source": [ + "with Executor(backend=\"flux_submission\", cache_directory=\"./cache\") as exe:\n", + " fs = exe.submit(calc, 3, resource_dict={\"cores\": 2})\n", + " print(fs.result())" + ] + }, + { + "cell_type": "markdown", + "id": "d91499d7-5c6c-4c10-b7b7-bfc4b87ddaa8", + "metadata": {}, + "source": "Beyond CPU cores and threads which were previously also introduced for the [Local Mode]() the HPC submission mode also provides the option to select the available accelerator cards or GPUs, by specifying the `\"gpus_per_core\"` parameter in the resource dictionary `resource_dict`. For demonstration we create a Python function which reads the GPU device IDs and submit it to the `Executor` class:\n```python\ndef get_available_gpus():\n import socket\n from tensorflow.python.client import device_lib\n local_device_protos = device_lib.list_local_devices()\n return [\n (x.name, x.physical_device_desc, socket.gethostname()) \n for x in local_device_protos if x.device_type == 'GPU'\n ]\n```\n\n```python\nwith Executor(\n backend=\"flux_submission\",\n cache_directory=\"./cache\",\n resource_dict={\"gpus_per_core\": 1}\n) as exe:\n fs_1 = exe.submit(get_available_gpus)\n fs_2 = exe.submit(get_available_gpus)\n print(fs_1.result(), fs_2.result())\n```" + }, + { + "cell_type": "markdown", + "id": "3f47fd34-04d1-42a7-bb06-6821dc99a648", + "metadata": {}, + "source": "### Cleaning Cache\nFinally, as the HPC Submission Mode leverages the file system to communicate serialized Python functions, it is important to clean up the cache directory specified by the `cache_directory` parameter once the results of the submitted Python functions are no longer needed. The serialized Python functions are stored in binary format using the [cloudpickle](https://github.com/cloudpipe/cloudpickle) library for serialization. This format is design for caching but not for long-term storage. The user is responsible for the long-term storage of their data." + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "66d6d566-e1ea-4f16-8cb2-cc4e6c6b42e7", + "metadata": { + "trusted": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": "['run_queue.sh', 'time.out', 'error.out', 'add_functee0545e0d3edb8a4a6ceb6d5ae712d39.h5out', 'add_functd84e6bfb7b6ad4c0d984dbd536111add.h5out', 'add_funct95858a1edeb485034ec6f9739aaa3ce3.h5out', 'add_funct843c1fde7badf8cee51567c8f2122d4f.h5out', 'calc6f05130a7b724a84f9174df8a602ab12.h5out']\n" + } + ], + "source": [ + "import os\n", + "import shutil\n", + "\n", + "cache_dir = \"./cache\"\n", + "if os.path.exists(cache_dir):\n", + " print(os.listdir(cache_dir))\n", + " try:\n", + " shutil.rmtree(cache_dir)\n", + " except OSError:\n", + " pass" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1de93586-d302-4aa6-878a-51acfb1d3009", + "metadata": { + "trusted": true + }, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Flux", + "language": "python", + "name": "flux" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} \ No newline at end of file