From a2e4198bf1f34087b84e6cc71986fea2b26dba72 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Sat, 23 Jan 2021 14:47:43 +0000 Subject: [PATCH 1/3] Doc on handling worker with walltime --- docs/source/advanced-tips-and-tricks.rst | 54 ++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/docs/source/advanced-tips-and-tricks.rst b/docs/source/advanced-tips-and-tricks.rst index 95b4d884..8f1c59ac 100644 --- a/docs/source/advanced-tips-and-tricks.rst +++ b/docs/source/advanced-tips-and-tricks.rst @@ -69,6 +69,60 @@ accepted option on some SLURM clusters. The error was something like this: sbatch: error: Batch job submission failed: Requested node configuration is not available +How to handle job queueing system walltime killing workers +---------------------------------------------------------- +In dask-jobqueue, every worker processes run inside a job, and all jobs have a time limit in job queueing systems. +Reaching walltime can be troublesome in several cases: +- when you don't have a lot of room on you HPC platform and have only a few workers at a time (less than what you were hopping for when using scale or adapt). These workers will be killed (and others started) before you workload ends. +- when you really don't know how long your workload will take: all your workers could be killed before reaching the end. In this case, you'll want to use adaptive clusters so that Dask ensures some workers are always up. + +If you don't set the proper parameters, you'll run into KilleWorker exceptions in those two cases. + +The solution to this problem is to tell Dask up front that the workers have a finit life time: +- Use `--lifetime` worker option. This will enables infinite workloads using adaptive. Workers will be properly shut down before the scheduling system kills them, and all their states moved. +- Use `--lifetime-stagger` when dealing with many workers (say > 20): this will allow to avoid workers all terminating at the same time, and so to ease rebalancing tasks and scheduling burden. + +Here is an example of how to use these parameters: + +.. code-block:: python + + cluster = Cluster(walltime='01:00:00', cores=4, memory='16gb', extra=["--lifetime", "55m", "--lifetime-stagger", "4m"]) + cluster.adapt(minimum=0, maximum=200) + + +Here is an example of a workflow taking advantage of this, if you wan't to give it a try or adapt it to your use case: + +.. code-block:: python + + import time + import numpy as np + from dask_jobqueue import PBSCluster as Cluster + from dask import delayed + from dask.distributed import Client, as_completed + + # config in $HOME/.config/dask/jobqueue.yaml + cluster = Cluster(walltime='00:01:00', cores=1, memory='4gb') + cluster.adapt(minimum=0, maximum=4) + + client = Client(cluster) + + # each job takes 1s, and we have 4 cpus * 1 min * 60s/min = 240 cpu.s, let's ask for a little more tasks. + filenames = [f'img{num}.jpg' for num in range(480)] + + def features(num_fn): + num, image_fn = num_fn + time.sleep(1) # takes about 1s to compute features on an image + features = np.random.random(246) + return num, features + + num_files = len(filenames) + num_features = len(features((0, filenames[0]))[1]) # FIX + + X = np.zeros((num_files, num_features), dtype=np.float32) + + for future in as_completed(client.map(features, list(enumerate(filenames)))): # FIX + i, v = future.result() + X[i, :] = v From 0fb0a56b6e42b56601147bfc547aab2540ba391b Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Sat, 23 Jan 2021 14:52:27 +0000 Subject: [PATCH 2/3] Improving inlining --- docs/source/advanced-tips-and-tricks.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/source/advanced-tips-and-tricks.rst b/docs/source/advanced-tips-and-tricks.rst index 8f1c59ac..3f29365c 100644 --- a/docs/source/advanced-tips-and-tricks.rst +++ b/docs/source/advanced-tips-and-tricks.rst @@ -74,12 +74,14 @@ How to handle job queueing system walltime killing workers In dask-jobqueue, every worker processes run inside a job, and all jobs have a time limit in job queueing systems. Reaching walltime can be troublesome in several cases: + - when you don't have a lot of room on you HPC platform and have only a few workers at a time (less than what you were hopping for when using scale or adapt). These workers will be killed (and others started) before you workload ends. - when you really don't know how long your workload will take: all your workers could be killed before reaching the end. In this case, you'll want to use adaptive clusters so that Dask ensures some workers are always up. If you don't set the proper parameters, you'll run into KilleWorker exceptions in those two cases. The solution to this problem is to tell Dask up front that the workers have a finit life time: + - Use `--lifetime` worker option. This will enables infinite workloads using adaptive. Workers will be properly shut down before the scheduling system kills them, and all their states moved. - Use `--lifetime-stagger` when dealing with many workers (say > 20): this will allow to avoid workers all terminating at the same time, and so to ease rebalancing tasks and scheduling burden. From dc41575fd8d9f55c63dacfae33bc1cce7d32d661 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Sat, 23 Jan 2021 20:18:01 +0000 Subject: [PATCH 3/3] Fix typos --- docs/source/advanced-tips-and-tricks.rst | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/docs/source/advanced-tips-and-tricks.rst b/docs/source/advanced-tips-and-tricks.rst index 3f29365c..bd798106 100644 --- a/docs/source/advanced-tips-and-tricks.rst +++ b/docs/source/advanced-tips-and-tricks.rst @@ -72,18 +72,24 @@ accepted option on some SLURM clusters. The error was something like this: How to handle job queueing system walltime killing workers ---------------------------------------------------------- -In dask-jobqueue, every worker processes run inside a job, and all jobs have a time limit in job queueing systems. +In dask-jobqueue, every worker process runs inside a job, and all jobs have a time limit in job queueing systems. Reaching walltime can be troublesome in several cases: -- when you don't have a lot of room on you HPC platform and have only a few workers at a time (less than what you were hopping for when using scale or adapt). These workers will be killed (and others started) before you workload ends. -- when you really don't know how long your workload will take: all your workers could be killed before reaching the end. In this case, you'll want to use adaptive clusters so that Dask ensures some workers are always up. +- when you don't have a lot of room on you HPC platform and have only a few workers at a time + (less than what you were hoping for when using scale or adapt). These workers will be + killed (and others started) before your workload ends. +- when you really don't know how long your workload will take: all your workers could be + killed before reaching the end. In this case, you'll want to use adaptive clusters so + that Dask ensures some workers are always up. -If you don't set the proper parameters, you'll run into KilleWorker exceptions in those two cases. +If you don't set the proper parameters, you'll run into KilleWorker exception in those two cases. -The solution to this problem is to tell Dask up front that the workers have a finit life time: +The solution to this problem is to tell Dask up front that the workers have a finite lifetime: -- Use `--lifetime` worker option. This will enables infinite workloads using adaptive. Workers will be properly shut down before the scheduling system kills them, and all their states moved. -- Use `--lifetime-stagger` when dealing with many workers (say > 20): this will allow to avoid workers all terminating at the same time, and so to ease rebalancing tasks and scheduling burden. +- Use `--lifetime` worker option. This will enable infinite workloads using adaptive. + Workers will be properly shut down before the scheduling system kills them, and all their states moved. +- Use `--lifetime-stagger` when dealing with many workers (say > 20): this will prevent workers from + terminating at the same time, thus ease rebalancing tasks and scheduling burden. Here is an example of how to use these parameters: @@ -93,7 +99,7 @@ Here is an example of how to use these parameters: cluster.adapt(minimum=0, maximum=200) -Here is an example of a workflow taking advantage of this, if you wan't to give it a try or adapt it to your use case: +Here is an example of a workflow taking advantage of this, if you want to give it a try or adapt it to your use case: .. code-block:: python