Skip to content

Commit

Permalink
Add flux executor shutdown (#479)
Browse files Browse the repository at this point in the history
* Add flux executor shutdown

* update jupyter notebook

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* shutdown(wait=False)

* remove shutdown()

* Use backend="flux"

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
jan-janssen and pre-commit-ci[bot] authored Nov 6, 2024
1 parent 3d7c8bf commit 2132028
Showing 1 changed file with 68 additions and 84 deletions.
152 changes: 68 additions & 84 deletions notebooks/examples.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,11 @@
"metadata": {},
"outputs": [],
"source": [
"import flux.job\n",
"from executorlib import Executor\n",
"\n",
"with flux.job.FluxExecutor() as flux_exe:\n",
" with Executor(max_cores=1, flux_executor=flux_exe) as exe:\n",
" future = exe.submit(sum, [1, 1])\n",
" print(future.result())"
"with Executor(max_cores=1, backend=\"flux\") as exe:\n",
" future = exe.submit(sum, [1, 1])\n",
" print(future.result())"
]
},
{
Expand All @@ -98,28 +96,26 @@
"metadata": {},
"outputs": [],
"source": [
"import flux.job\n",
"from executorlib import Executor\n",
"\n",
"\n",
"def calc(*args):\n",
" return sum(*args)\n",
"\n",
"\n",
"with flux.job.FluxExecutor() as flux_exe:\n",
" with Executor(max_cores=2, flux_executor=flux_exe) as exe:\n",
" fs_1 = exe.submit(calc, [2, 1])\n",
" fs_2 = exe.submit(calc, [2, 2])\n",
" fs_3 = exe.submit(calc, [2, 3])\n",
" fs_4 = exe.submit(calc, [2, 4])\n",
" print(\n",
" [\n",
" fs_1.result(),\n",
" fs_2.result(),\n",
" fs_3.result(),\n",
" fs_4.result(),\n",
" ]\n",
" )"
"with Executor(max_cores=2, backend=\"flux\") as exe:\n",
" fs_1 = exe.submit(calc, [2, 1])\n",
" fs_2 = exe.submit(calc, [2, 2])\n",
" fs_3 = exe.submit(calc, [2, 3])\n",
" fs_4 = exe.submit(calc, [2, 4])\n",
" print(\n",
" [\n",
" fs_1.result(),\n",
" fs_2.result(),\n",
" fs_3.result(),\n",
" fs_4.result(),\n",
" ]\n",
" )"
]
},
{
Expand Down Expand Up @@ -156,17 +152,15 @@
"metadata": {},
"outputs": [],
"source": [
"import flux.job\n",
"from executorlib import Executor\n",
"\n",
"\n",
"def calc(*args):\n",
" return sum(*args)\n",
"\n",
"\n",
"with flux.job.FluxExecutor() as flux_exe:\n",
" with Executor(max_cores=2, flux_executor=flux_exe) as exe:\n",
" print(list(exe.map(calc, [[2, 1], [2, 2], [2, 3], [2, 4]])))"
"with Executor(max_cores=2, backend=\"flux\") as exe:\n",
" print(list(exe.map(calc, [[2, 1], [2, 2], [2, 3], [2, 4]])))"
]
},
{
Expand Down Expand Up @@ -272,27 +266,25 @@
"metadata": {},
"outputs": [],
"source": [
"import flux.job\n",
"from executorlib import Executor\n",
"\n",
"\n",
"def calc_function(parameter_a, parameter_b):\n",
" return parameter_a + parameter_b\n",
"\n",
"\n",
"with flux.job.FluxExecutor() as flux_exe:\n",
" with Executor(\n",
" # Resource definition on the executor level\n",
" max_cores=2, # total number of cores available to the Executor\n",
" block_allocation=True, # reuse python processes\n",
" flux_executor=flux_exe,\n",
" ) as exe:\n",
" future_obj = exe.submit(\n",
" calc_function,\n",
" 1, # parameter_a\n",
" parameter_b=2,\n",
" )\n",
" print(future_obj.result())"
"with Executor(\n",
" # Resource definition on the executor level\n",
" max_cores=2, # total number of cores available to the Executor\n",
" block_allocation=True, # reuse python processes\n",
" backend=\"flux\",\n",
") as exe:\n",
" future_obj = exe.submit(\n",
" calc_function,\n",
" 1, # parameter_a\n",
" parameter_b=2,\n",
" )\n",
" print(future_obj.result())"
]
},
{
Expand Down Expand Up @@ -326,7 +318,6 @@
"metadata": {},
"outputs": [],
"source": [
"import flux.job\n",
"from executorlib import Executor\n",
"\n",
"\n",
Expand All @@ -338,15 +329,14 @@
" return {\"j\": 4, \"k\": 3, \"l\": 2}\n",
"\n",
"\n",
"with flux.job.FluxExecutor() as flux_exe:\n",
" with Executor(\n",
" max_cores=1,\n",
" init_function=init_function,\n",
" flux_executor=flux_exe,\n",
" block_allocation=True,\n",
" ) as exe:\n",
" fs = exe.submit(calc, 2, j=5)\n",
" print(fs.result())"
"with Executor(\n",
" max_cores=1,\n",
" init_function=init_function,\n",
" backend=\"flux\",\n",
" block_allocation=True,\n",
") as exe:\n",
" fs = exe.submit(calc, 2, j=5)\n",
" print(fs.result())"
]
},
{
Expand Down Expand Up @@ -458,7 +448,6 @@
"metadata": {},
"outputs": [],
"source": [
"import flux.job\n",
"from executorlib import Executor\n",
"\n",
"\n",
Expand All @@ -470,15 +459,14 @@
" return i, size, rank\n",
"\n",
"\n",
"with flux.job.FluxExecutor() as flux_exe:\n",
" with Executor(\n",
" max_cores=2,\n",
" resource_dict={\"cores\": 2},\n",
" flux_executor=flux_exe,\n",
" flux_executor_pmi_mode=\"pmix\",\n",
" ) as exe:\n",
" fs = exe.submit(calc, 3)\n",
" print(fs.result())"
"with Executor(\n",
" max_cores=2,\n",
" resource_dict={\"cores\": 2},\n",
" backend=\"flux\",\n",
" flux_executor_pmi_mode=\"pmix\",\n",
") as exe:\n",
" fs = exe.submit(calc, 3)\n",
" print(fs.result())"
]
},
{
Expand Down Expand Up @@ -518,7 +506,6 @@
"source": [
"```\n",
"import socket\n",
"import flux.job\n",
"from executorlib import Executor\n",
"from tensorflow.python.client import device_lib\n",
"\n",
Expand All @@ -529,15 +516,14 @@
" for x in local_device_protos if x.device_type == 'GPU'\n",
" ]\n",
"\n",
"with flux.job.FluxExecutor() as flux_exe:\n",
" with Executor(\n",
" max_workers=2, \n",
" gpus_per_worker=1,\n",
" executor=flux_exe,\n",
" ) as exe:\n",
" fs_1 = exe.submit(get_available_gpus)\n",
" fs_2 = exe.submit(get_available_gpus)\n",
" print(fs_1.result(), fs_2.result())\n",
"with Executor(\n",
" max_workers=2, \n",
" gpus_per_worker=1,\n",
" backend=\"flux\",\n",
") as exe:\n",
" fs_1 = exe.submit(get_available_gpus)\n",
" fs_2 = exe.submit(get_available_gpus)\n",
" print(fs_1.result(), fs_2.result())\n",
"```"
]
},
Expand Down Expand Up @@ -634,29 +620,27 @@
"metadata": {},
"outputs": [],
"source": [
"import flux.job\n",
"from executorlib import Executor\n",
"\n",
"\n",
"def calc_function(parameter_a, parameter_b):\n",
" return parameter_a + parameter_b\n",
"\n",
"\n",
"with flux.job.FluxExecutor() as flux_exe:\n",
" with Executor(max_cores=2, flux_executor=flux_exe) as exe:\n",
" future_1 = exe.submit(\n",
" calc_function,\n",
" 1,\n",
" parameter_b=2,\n",
" resource_dict={\"cores\": 1},\n",
" )\n",
" future_2 = exe.submit(\n",
" calc_function,\n",
" 1,\n",
" parameter_b=future_1,\n",
" resource_dict={\"cores\": 1},\n",
" )\n",
" print(future_2.result())"
"with Executor(max_cores=2, backend=\"flux\") as exe:\n",
" future_1 = exe.submit(\n",
" calc_function,\n",
" 1,\n",
" parameter_b=2,\n",
" resource_dict={\"cores\": 1},\n",
" )\n",
" future_2 = exe.submit(\n",
" calc_function,\n",
" 1,\n",
" parameter_b=future_1,\n",
" resource_dict={\"cores\": 1},\n",
" )\n",
" print(future_2.result())"
]
},
{
Expand Down

0 comments on commit 2132028

Please sign in to comment.