Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Doc on handling worker with walltime #481

Merged
merged 3 commits into from
Jan 24, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions docs/source/advanced-tips-and-tricks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,68 @@ accepted option on some SLURM clusters. The error was something like this:
sbatch: error: Batch job submission failed: Requested node configuration is not available


How to handle job queueing system walltime killing workers
----------------------------------------------------------

In dask-jobqueue, every worker process runs inside a job, and all jobs have a time limit in job queueing systems.
Reaching walltime can be troublesome in several cases:

- when you don't have a lot of room on you HPC platform and have only a few workers at a time
(less than what you were hoping for when using scale or adapt). These workers will be
killed (and others started) before your workload ends.
- when you really don't know how long your workload will take: all your workers could be
killed before reaching the end. In this case, you'll want to use adaptive clusters so
that Dask ensures some workers are always up.

If you don't set the proper parameters, you'll run into KilleWorker exception in those two cases.

The solution to this problem is to tell Dask up front that the workers have a finite lifetime:

- Use `--lifetime` worker option. This will enable infinite workloads using adaptive.
Workers will be properly shut down before the scheduling system kills them, and all their states moved.
- Use `--lifetime-stagger` when dealing with many workers (say > 20): this will prevent workers from
terminating at the same time, thus ease rebalancing tasks and scheduling burden.

Here is an example of how to use these parameters:

.. code-block:: python

cluster = Cluster(walltime='01:00:00', cores=4, memory='16gb', extra=["--lifetime", "55m", "--lifetime-stagger", "4m"])
cluster.adapt(minimum=0, maximum=200)


Here is an example of a workflow taking advantage of this, if you want to give it a try or adapt it to your use case:

.. code-block:: python

import time
import numpy as np
from dask_jobqueue import PBSCluster as Cluster
from dask import delayed
from dask.distributed import Client, as_completed

# config in $HOME/.config/dask/jobqueue.yaml
cluster = Cluster(walltime='00:01:00', cores=1, memory='4gb')
cluster.adapt(minimum=0, maximum=4)

client = Client(cluster)

# each job takes 1s, and we have 4 cpus * 1 min * 60s/min = 240 cpu.s, let's ask for a little more tasks.
filenames = [f'img{num}.jpg' for num in range(480)]

def features(num_fn):
num, image_fn = num_fn
time.sleep(1) # takes about 1s to compute features on an image
features = np.random.random(246)
return num, features

num_files = len(filenames)
num_features = len(features((0, filenames[0]))[1]) # FIX

X = np.zeros((num_files, num_features), dtype=np.float32)

for future in as_completed(client.map(features, list(enumerate(filenames)))): # FIX
i, v = future.result()
X[i, :] = v