Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make SLURMCluster.scale() to request multiple nodes at once #459

Closed
demaheim opened this issue Aug 18, 2020 · 14 comments
Closed

Make SLURMCluster.scale() to request multiple nodes at once #459

demaheim opened this issue Aug 18, 2020 · 14 comments

Comments

@demaheim
Copy link

Our cluster prefers multiple node requests over single node requests.
The problem is that

  • Scaling a SLURMCluster on this system takes long time.
  • E.g. SLURMCluster.scale(10) makes 10 single node requests which takes much longer than just requesting 10 nodes at once.

Our current workaround to speedup scaling is to start the cluster via the script

#SBATCH -J try_dask 
#SBATCH -N 11              # Number of workers + 1
#SBATCH -n 11              
#SBATCH -c 10 
#SBATCH -p S-M  
#SBATCH --account=GT5DSTC 
#SBATCH --time 00:10:00
#SBATCH --mem 60GB

# Select scheduler node
SCHEDULER=$(srun hostname | head -1)

# Start scheduler
srun -N 1 -n 1 --nodelist=$SCHEDULER dask-scheduler --interface ipogif0 --scheduler-file cluster.json &

# Wait until scheduler is ready
sleep 20

# Start worker
srun -N 10 -n 10 --exclude=$SCHEDULER dask-worker --scheduler-file cluster.json &

try_dask.py

In this case, the 11 nodes are requested at once which is much faster than making 11 single requests.

How can SLURMCluster.scale() be adapted to request all the 11 nodes at once?

@willirath
Copy link
Collaborator

I don't see an easy path towards fully supported multi-node jobs in Dask jobqueue as it's designed to serve as a common or at least very similar interface to many different batch scheduling systems.

I'm not familiar with many of the supported batch scheduling systems, but none I know has something that is as flexible as the salloc or sbatch --> srun you outline here. So maintaining general support of most batch scheduling systems while at the same time implementing this logic likely won't happen. One of the things that would become substantially more difficult if we gave up the notion of 1 worker = 1 job is fine grained scaling and adaptivity.

There might, however, be a simple workaround that could help with most of your scenarios and if you don't care about fine-grained cluster.scale() or adaptivity:

  • Use the job_extra= kwarg to set -n ... and -N ... in the header of the jobscript.

  • Use the python= kwarg to inject the srun call: python="srun -N 10 -n 10 --exclude=$SCHEDULER python" could work if the python in you default Path is the right one. Otherwise activate with env_extra=["<whatever needed to activate correct env", ].

Note that SLURMCluster will set up a scheduler so you don't nee this part in the job script.

(cc: @kathoef who might be interested in giving this a try as well.)

@willirath
Copy link
Collaborator

Also, for Dask clusters of static size, dask-mpi could be the right solution.

@willirath
Copy link
Collaborator

willirath commented Aug 18, 2020

Just tried this. The hard coded -n 1
https://github.com/dask/dask-jobqueue/blob/master/dask_jobqueue/slurm.py#L67
stops you from just adding more tasks to the job header.

@guillaumeeb
Copy link
Member

Hi @demaheim, see also #196. It is about job arrays, but I think it shares your concern, and give some explenation on why this is not implemented in dask-jobqueue.

TL;DR; the current approach is to find a workaround to avoir complexifying dask-jobqueue code. So if you find one for Slurm, this would be very welcomed!

@guillaumeeb
Copy link
Member

Just tried this. The hard coded -n 1 stops you from just adding more tasks to the job header.

Hey @willirath, could you try to use header_skip parameter?

@demaheim
Copy link
Author

Hi @willirath and @guillaumeeb ,
thanks for explaining why my request is difficult do implement in dask-jobqueue.
I tried using

header_skip=['-n', '-N'],
job_extra=['-n 2', '-N 2']

but there are still only single node requests:

JOBID PAR     NAME     USER ACCOUNT ST       TIME NODE   CPUS NODELIST(REASON)
465932 S-M dask-wor    dheim gt5dstc PD       0:00    1      4       (Priority)
465933 S-M dask-wor    dheim gt5dstc PD       0:00    1      4       (Priority)
465934 S-M dask-wor    dheim gt5dstc PD       0:00    1      4       (Priority)
...

Later I will try

  • python="srun -N 10 -n 10 --exclude=$SCHEDULER python"
  • Consider dask-mpi
  • And think about finding a workaround for Slurm without complexifying dask-jobqueue.

@willirath
Copy link
Collaborator

header_skip=['-n', '-N'],
job_extra=['-n 2', '-N 2']

I think header_skip skips lines that contain the listed strings:

if not any(skip in line for skip in self.header_skip)

The following could do:

header_skip=['-n 1', '-N 1']
job_extra=['-n 2', '-N 2']

@willirath
Copy link
Collaborator

You can check the job script with

print(clutster.job_script())

and make sure the header looks fine.

@demaheim
Copy link
Author

When I use

header_skip=['-n 1', '-N 1'],
job_extra=['-n 9', '-N 9'])
cluster.scale(1)

it looks good (requesting 9 nodes at once)

  JOBID PAR     NAME     USER ACCOUNT ST       TIME NODE   CPUS NODELIST(REASON)
 465990 S-M try_dask    dheim gt5dstc  R       0:51    1     10         nid00438
 465991 S-M dask-wor    dheim gt5dstc PD       0:00    9     36       (Priority)

cluster.job_script():

#SBATCH -J dask-worker
#SBATCH -p S-M
#SBATCH -A GT5DSTC
#SBATCH --cpus-per-task=4
#SBATCH --mem=56G
#SBATCH -t 0:10:00
#SBATCH -n 9
#SBATCH -N 9

/home/dheim/miniconda3/bin/python -m distributed.cli.dask_worker tcp://10.128.1.185:45527 --nthreads 4 --memory-limit 60.00GB --name name --nanny --death-timeout 60 --interface ipogif0

but when I use numbers > 9, e.g. 10:

header_skip=['-n 1', '-N 1'],
job_extra=['-n 10', '-N 10']
cluster.scale(1)

only one node is requested:

  JOBID PAR     NAME     USER ACCOUNT ST       TIME NODE   CPUS NODELIST(REASON)
 465995 S-M try_dask    dheim gt5dstc  R       0:18    1     10         nid00438
 465996 S-M dask-wor    dheim gt5dstc PD       0:00    1      4       (Priority)

cluster.job_script():

#SBATCH -J dask-worker
#SBATCH -p S-M
#SBATCH -A GT5DSTC
#SBATCH --cpus-per-task=4
#SBATCH --mem=56G
#SBATCH -t 0:10:00

/home/dheim/miniconda3/bin/python -m distributed.cli.dask_worker tcp://10.128.1.185:44112 --nthreads 4 --memory-limit 60.00GB --name name --nanny --death-timeout 60 --interface ipogif0

(sorry, I need to logoff the cluster now, I will try again tomorrow)

@willirath
Copy link
Collaborator

This looks like expected (but somewhat buggy, see #461) behaviour: header_skip=['-n 1', ...] will remove all lines containing "-n 1" including the one for 10 nodes you added with job_extra. Dirty workaround until #461 is fixed would be to add an extra space or in the job_extra arg 😞

The Job script with 9 nodes looks promising. But you'll need to use

python="srun -N 9 -n 9 /home/dheim/miniconda3/bin/python"

to make sure you're really getting 9 tasks each running a worker.

@demaheim
Copy link
Author

The dirty workaround first looks good:

header_skip=['-n 1', '-N 1'],
job_extra=['-n  10', '-N  10',

leads to

  JOBID PAR     NAME     USER ACCOUNT ST       TIME NODE   CPUS NODELIST(REASON)
 466348 S-M try_dask    dheim gt5dstc  R       0:10    1     10         nid00001
 466349 S-M dask-wor    dheim gt5dstc PD       0:00   10     40       (Priority)

but

python="srun -N 9 -n 9 /home/dheim/miniconda3/bin/python"

seems to result in many errors

distributed.utils - ERROR - 'tcp://10.128.2.151:40346'
Traceback (most recent call last):
  File "/home/dheim/miniconda3/lib/python3.7/site-packages/distributed/utils.py", line 663, in log_errors
    yield
  File "/home/dheim/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 1923, in remove_worker
    self.stream_comms[address].send({"op": "close", "report": False})
KeyError: 'tcp://10.128.2.151:40346'
distributed.core - ERROR - 'tcp://10.128.2.151:40346'
Traceback (most recent call last):
  File "/home/dheim/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 408, in handle_comm
    result = handler(comm, **msg)
  File "/home/dheim/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 1923, in remove_worker
    self.stream_comms[address].send({"op": "close", "report": False})
KeyError: 'tcp://10.128.2.151:40346'

@ocaisa
Copy link
Member

ocaisa commented Aug 26, 2020

There are a few potential issues that could be triggering this, one is that (to my understanding) there is likely no guarantee from the the launcher (srun in this case) that anybody other than the root process can access argc and argv, with MPI you would usually have to distribute these if they were required by other MPI tasks.

The other is that even if they were available, your 9 workers would all share the same name kwarg which I'm pretty sure leads to the type of errors that you are seeing.

With the current approach what you are really looking for is to use dask-mpi (without a scheduler) from within dask-jobqueue...might not be so hard (especially if you a are already willing to hack the headers) but I imagine it is uncharted territory.

I don't have a general solution for this, the job array discussion in #196 would probably serve your needs but that might be some time away.

@demaheim
Copy link
Author

Thank you for all your help! I will try dask-mpi and check if it serves our needs!

@guillaumeeb
Copy link
Member

The other is that even if they were available, your 9 workers would all share the same name kwarg which I'm pretty sure leads to the type of errors that you are seeing.

I think there is a workaround for this with #480.

Closing this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants