Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can dask-jobqueue use multi-node jobs (Was: Creating dask-jobqueue cluster from a single job versus multiple jobs)? #364

Closed
wtbarnes opened this issue Nov 15, 2019 · 10 comments

Comments

@wtbarnes
Copy link
Member

This is not strictly an "issue" and more a question about suggested usage so if this question belongs somewhere else, please direct me there!

I've been working closely with admins of the NASA Pleiades HPC system on how best to support interactive Dask workflows on that system. Pleiades uses PBS. Thus far, my workflow has been to configure a cluster in which a single worker uses all available cores and memory on a single node. For example, for a machine that has 12 cores and 48 GB of memory per node, my jobqueue config is the following:

jobqueue:
    pbs:
        cores: 12
        processes: 1
        memory: 48GB
        interface: ib0
        resource_spec: "select=1:ncpus=12"
        walltime: "00:30:00"

I then request 10-20 nodes by starting an equivalent number of jobs, e.g by running cluster.scale(10). A lot of the time, this configuration works well, but one of the problems that has cropped up (and is common to many systems) is high availability of resources, i.e. when the cluster is busy, I may have to wait > 30 minutes for even a single job to start; not so ideal for interactive workflows! The entire Pleiades system is in very high demand so this is often an issue, particularly on the newer, faster processors.

After raising this issue with the HPC staff, one suggested solution was to use a high-availability queue (called "devel") that allows users to submit only a single job at a time, but has very high availability (i.e. short waiting times). In this case, the suggested pattern would be to submit a single job that requests multiple cores on multiple nodes. For 25 nodes, each with 24 cores and 128 GB of memory, the jobqueue config is:

jobqueue:
  pbs:
    cores: 200
    memory: 2000GB
    processes: 200
    interface: ib0 
    queue: devel
    resource_spec: 'select=25:ncpus=8:model=has'
    walltime: "00:30:00"

Then, the user would call cluster.scale(1) once and be done. This satisfies the 1 job restriction of the high-availability queue, reduces wait times as you have asked for all resources up front, and is also more "friendly" to the scheduler as it does not involve submitting many jobs. This pattern of course does not permit any scaling up or down, but that is a separate issue.

This is quite different than the multi-job workflow I've used previously (and the one that seems to be recommended in the dask-jobqueue docs) and I'm trying to wrap my head around whether this makes sens. My main questions are :

  1. Does this configuration pattern make sense in the context of dask-jobqueue and are there any disadvantages (other than scalability)? In the context of dask-jobqueue, the single-job cluster seems like an anti-pattern to me, but I certainly understand why this is preferable from the HPC admin perspective.
  2. Which node is this single Dask worker running on? and
  3. How is work being parallelized across multiple nodes if only a single job is running?

I've experimented with the above single-job workflow (and minor variations on it) and have found that computations (which worked just fine in the multi-job context) will lock up and or result in a killed worker. However, it is not entirely clear to my why this is happening.

I apologize for the lengthy post! I'm trying to get a sense of what is the most optimal usage pattern here in the context of many different configuration options and trying to wrap my head around how all of this actually works. Any advice would be extremely helpful!

@guillaumeeb
Copy link
Member

guillaumeeb commented Nov 16, 2019

Hi @wtbarnes, thanks for the question!

Does this configuration pattern make sense in the context of dask-jobqueue

Nope, it does not. There has been some discussion about this in the past. Dask-jobqueue is quite simple, and does not handle multi node jobs. As you say, it is an anti-pattern.

In order to do this, you probably want to look as Dask-mpi.

For your remaining questions, the dask-worker is a process which will run only on one node, so the first one from your reservation. Other nodes you ask from PBS will do nothing, remaining idle. Your worker is probably killed because it exceeds the resources available on its node.

Another thing you may try is stick with dask-jobqueue, but submit smaller jobs, e.g. cores=4, memory=16GB. This way they may be scheduled faster than jobs needing entire nodes. But this may not be possible on Pleiades, or the admins may not like it as it means more jobs submitted to the batch scheduler.

@wtbarnes
Copy link
Member Author

@guillaumeeb Thanks for the detailed reply. This is what I suspected, but wanted to make sure my intuition was correct! I'll go ahead and close this issue, but may add comments following further discussion with the Pleiades folks. Thanks for the Dask-mpi suggestion as well! This may be a good solution for their proposed single-job, high-availability queue.

@wtbarnes wtbarnes reopened this Nov 18, 2019
@wtbarnes
Copy link
Member Author

@guillaumeeb I have to correct something I said previously:

Then, the user would call cluster.scale(1) once and be done.

Rather, for the example single-job, multi-node example that I gave above, this would be cluster.scale(200). Given that I'm asking for 25 nodes and 8 cpus per node, this just submits a single job (but starts 200 workers).

Does this overcome the multi-node limitation of dask-jobqueue? Or will work still only be allocated over a single node?

@lesteve
Copy link
Member

lesteve commented Nov 19, 2019

I think the answer is the same dask-jobqueue does not know how to use multi-node jobs. It seems like you understand your job scheduler well, so to understand what dask-jobqueue is doing in terms of jobs: print(cluster.job_script()) will tell you the job script that is going to be submitted. The argument of cluster.scale will tell you how many of this jobs are going to be submitted. This is explained in more details in https://jobqueue.dask.org/en/latest/howitworks.html. If you think this page can be improved, a PR would be greatly appreciated!

Full disclosure: the scheduler I know the most about is SGE, which does not have the multi-node jobs feature, so I don't really understand how multi-node jobs are used in practice with SLURM or other schedulers.

Side-comment: with the 0.7 release you can use cluster.scale(jobs=1) which makes it easier to reason in terms of jobs rather than Dask workers.

@jeffcbecker
Copy link

Now I'm confused. My jobqueue.yaml is as follows:
jobqueue:
pbs:

# Dask worker options

 cores: 200             # Total cores per job
 memory: 2000GB          # Total amount of memory per job
 processes: 200         # Number of Python processes per job

 interface: ib0          # Network interface to use like eth0 or ib0
 death-timeout: 60      # Number of seconds to wait if a worker can not find a scheduler
 local-directory: /nobackup/jcbecker/dask # Location of fast local storage like /scratch or $TMPDIR

 # PBS resource manager options
 queue: devel
 project: null
 walltime: '01:00:00'
 extra: []
 env-extra: []
 resource-spec: 'select=25:ncpus=8:model=has'
 job-extra: []
 log-directory: null

Then in Python I do:
jcbecker@pfe20 506$ python
Python 3.7.3 | packaged by conda-forge | (default, Jul 1 2019, 21:52:21)
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.

from dask_jobqueue import PBSCluster
from dask.distributed import Client
ncores = 200
cluster = PBSCluster()
cluster.scale(ncores)
client = Client(cluster)
print(cluster.job_script())
#!/usr/bin/env bash

#PBS -N dask-worker
#PBS -q devel
#PBS -l select=25:ncpus=8:model=has
#PBS -l walltime=01:00:00
JOB_ID=${PBS_JOBID%%.*}

/home7/jcbecker/.conda/envs/geo/bin/python -m distributed.cli.dask_worker tcp://10.150.27.18:43961 --nthreads 1 --nprocs 200 --memory-limit 10.00GB --name dask-worker--${JOB_ID}-- --death-timeout 60 --local-directory /nobackup/jcbecker/dask --interface ib0

So PBS gave me 25 nodes on which to run 200 processes. Isn't that how many workers I get according to cluster.job_script()?

Please enlighten me.

@guillaumeeb
Copy link
Member

To answer @wtbarnes and @jeffcbecker questions:

Does this overcome the multi-node limitation of dask-jobqueue? Or will work still only be allocated over a single node?

So PBS gave me 25 nodes on which to run 200 processes. Isn't that how many workers I get according to cluster.job_script()?

Yes, PBS gives you 25 nodes.
But the python script you launch doesn't know how to use it. There's no MPI magic there or any distributed SSH like thing. It just starts 200 processes on the first ("mother") node PBS gave you. So all your work will be run on this node, and the node will probably be overwhelmed...

Some batch scheduler such as slurm proposes other abstractions to just spawn processes on any node of the reservation. PBS has pbs_dsh, but nothing like that is happening here.

@jeffcbecker
Copy link

jeffcbecker commented Nov 19, 2019

Thank you for the clarification. I changed my jobqueue.yaml to have the following
resource-spec: 'select=1:ncpus=24:model=has'
Then I do:

cluster = PBSCluster()
cluster.scale(240)
cluster.job_script()
'#!/usr/bin/env bash\n\n#PBS -N dask-worker\n#PBS -q normal\n#PBS -l select=1:ncpus=8:model=has\n#PBS -l walltime=01:00:00\nJOB_ID=${PBS_JOBID%%.*}\n\n\n\n/home7/jcbecker/.conda/envs/geo/bin/python -m distributed.cli.dask_worker tcp://10.150.27.22:36343 --nthreads 1 --nprocs 24 --memory-limit 4.17GB --name dask-worker--${JOB_ID}-- --death-timeout 60 --local-directory /nobackup/jcbecker/dask --interface ib0 --interface ib0 --interface ib0\n'

And checking with qstat indicates that each job only requested 8 cpus, not 24 like I specified. Isn't this wrong? Note that each node is dual socket with 12 cores/socket, so why did Dask change my request from 24 cpus to 8?

@jeffcbecker
Copy link

Not sure what changed, but it's working correctly now - each job requests 24 cpus (cores)

@wtbarnes
Copy link
Member Author

@guillaumeeb @lesteve Thanks for all of your help on this. I think we have a more clear picture about how to proceed with our cluster configuration on Pleiades. I'm going to close this (again!) as we seemed to have resolved our main issue, but will reopen if we run into more problems.

@lesteve lesteve changed the title Creating dask-jobqueue cluster from a single job versus multiple jobs? Can dask-jobqueue use multi-node jobs (Was: Creating dask-jobqueue cluster from a single job versus multiple jobs)? Nov 20, 2019
@lesteve
Copy link
Member

lesteve commented Nov 20, 2019

FYI I changed the title to reflect the discussion. Feel free to edit it or suggest a better title!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants