Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example PBS Script #1260

Closed
mrocklin opened this issue Jul 17, 2017 · 127 comments
Closed

Example PBS Script #1260

mrocklin opened this issue Jul 17, 2017 · 127 comments

Comments

@mrocklin
Copy link
Member

mrocklin commented Jul 17, 2017

People using Dask on traditional job schedulers often depend on PBS scripts. It would be useful to include a plain example in the documentation that users to download, modify, and run.

What we do now

Currently we point users to the setup network docs, and in particular the section about using job schedulers with a shared network file system. The instructions there suggest that users submit two jobs, one for the scheduler and one for the workers:

# Start a dask-scheduler somewhere and write connection information to file
qsub -b y /path/to/dask-scheduler --scheduler-file /path/to/scheduler.json

# Start 100 dask-worker processes in an array job pointing to the same file
qsub -b y -t 1-100 /path/to/dask-worker --scheduler-file /path/to/scheduler.json

However this is flawed because the scheduler or workers may start and run independently from each other. It would be better to place them into a single job, where one special node is told to be the dask-scheduler process and all other nodes are told to be the dask-worker processes. Additionally we would like to offer some guidance on tuning the number of CPUs and pointing workers to use local high-speed scratch disk if available.

PBS script options

Many docs on PBS scripts exist online, but each seems to be made by an IT group at a separate super-computer. It is difficult to tease out what is general to all systems and what is specific to a single supercomputer or job scheduler. After reading from a number of pages I've cobbled together the following example.

#!/bin/bash -login
# Configure these values to change the size of your dask cluster
#PBS -t 1-9                 # Nine nodes.  One scheduler and eight workers
#PBS -l ncpus=4             # Four cores per node.
#PBS -l mem=20GB            # 20 GB of memory per node
#PBS -l walltime=01:00:00   # will run for at most one hour

# Environment variables
export OMP_NUM_THREADS=1

# Write ~/scheduler.json file in home directory
# connect with
# >>> from dask.distributed import Client
# >>> client = Client(scheduler_file='~/scheduler.json')

# Start scheduler on first process, workers on all others
if [[ $PBS_ARRAYID == '1' ]]; then
    dask-scheduler --scheduler-file $HOME/scheduler.json;
else
    dask-worker
    --scheduler-file $HOME/scheduler.json \   
    --nthreads $PBS_NUM_PPN \
    --local-directory $TMPDIR \
    --name worker-$PBS_ARRAYID \
    > $PBS_O_WORKDIR/$PBS_JOBID-$PBS_ARRAYID.out \  
    2> $PBS_O_WORKDIR/$PBS_JOBID-$PBS_ARRAYID.err;  
fi

https://wiki.hpcc.msu.edu/display/hpccdocs/Advanced+Scripting+Using+PBS+Environment+Variables
http://www.pbsworks.com/documentation/support/PBSProUserGuide10.4.pdf

Questions

  • What is the difference between ncpus and ppn?
  • How about -t 1-8 and nodes=8?

Does this actually work? I suspect not. I don't have a convenient testing system for this and would appreciate coverage by a few different groups.

@apatlpo
Copy link
Contributor

apatlpo commented Jul 17, 2017

Hi,
I actually have access to a cluster that uses PBS scripts and would very much like to compute distributed xarray computations on it.
Your code does not work for me right now for multiple reasons, the main one being that dask-distributed is not installed on the cluster. Do you think I could install it locally? or will that require sys admins to step in?
aurelien

@martindurant
Copy link
Member

I believe PBS_ARRAYID is indexed from zero, although this will not matter - we ensure to create exactly one scheduler either way.
During sprints, a mini-script very much like this one did succeed in setting up a cluster and running a joblib-derrived workflow.

@mrocklin
Copy link
Member Author

@apatlpo you can install dask using pip or conda. If you have a local python installation (such as would be created with anaconda) then you should be able to follow the instructions here without help from your system administrator: http://distributed.readthedocs.io/en/latest/install.html

@martindurant
Copy link
Member

One obvious possibility is to have the (conda) python distribution in a directory accessible to all machines. Also, code in knit exists to build an environment to a description or wrap an existing one and place it in a particular location.
The PATH would need to be set to see wherever python ends up, or call python by absolute path.

(pip install against a system python will work too, using the --user flag)

@apatlpo
Copy link
Contributor

apatlpo commented Jul 17, 2017

I believe the conda install went successfully.

After a very quick playing around before dinner time, it's not working yet but
here is what I've got for my PBS script:

#!/bin/bash -login
# Configure these values to change the size of your dask cluster
##PBS -t 1-9                 # Nine nodes.  One scheduler and eight workers
##PBS -l ncpus=4             # Four cores per node.
##PBS -l mem=20GB            # 20 GB of memory per node
#PBS -q mpi
#PBS -l select=2:ncpus=28:mpiprocs=28:mem=120G
#PBS -l walltime=01:00:00

# will run for at most one hour

export PATH="$HOME/.miniconda2/envs/daskdist/bin:$PATH"
echo $PATH

env

echo 'PBS_NODEFILE start ( '
cat $PBS_NODEFILE
echo ') PBS_NODEFILE ends '

# Environment variables
export OMP_NUM_THREADS=1

echo 'PBS_ARRAYID'
echo $PBS_ARRAYID
echo 'PBS_NUM_PPN'
echo $PBS_NUM_PPN
echo 'TMPDIR'
echo $TMPDIR
echo 'PBD_JOBID'
echo $PBD_JOBID

# Start scheduler on first process, workers on all others
if [[ $PBS_ARRAYID == '1' ]]; then
    dask-scheduler --scheduler-file $HOME/scheduler.json;
else
    dask-worker \
    --scheduler-file $HOME/scheduler.json \
    --nthreads $PBS_NUM_PPN \
    --local-directory $TMPDIR \
    --name worker-$PBS_ARRAYID \
    > $PBS_O_WORKDIR/$PBS_JOBID-$PBS_ARRAYID.out \
    2> $PBS_O_WORKDIR/$PBS_JOBID-$PBS_ARRAYID.err;
fi

the output is:

Warning: no access to tty (Bad file descriptor).
Thus no job control in this shell.
/home1/datahome/aponte/.miniconda2/envs/daskdist/bin:/home1/datahome/aponte/.miniconda2/envs/petsc/bin:/usr/pbs/bin/:/home1/datahome/aponte/.miniconda2/bin:/opt/sgi/sbin:/opt/sgi/bin:/usr/local/bin:/usr/bin:/bin:/usr/games:/opt/c3/bin:/opt/pbs/default/bin:/opt/pbs/default/sbin:/usr/lpp/mmfs/bin:/sbin:/bin
MKLROOT=/appli/intel/parallel_studio/parallel_studio_xe_2016_update4/compilers_and_libraries_2016.4.258/linux/mkl
CPLUS_INCLUDE_PATH=/appli/mpt/2.15-p11386/include
MODULE_VERSION_STACK=3.2.10
LESSKEY=/etc/lesskey.bin
MANPATH=/appli/intel/parallel_studio/parallel_studio_xe_2016_update4/compilers_and_libraries_2016.4.258/linux/man/common:/appli/mpt/2.15-p11386/man:/opt/pbs/default/man:/opt/sgi/share/man:/opt/intel/mic/man:/usr/local/man:/usr/share/man:/opt/c3/man:/opt/pbs/default/man:/opt/sgi/share/man
NNTPSERVER=news
NETCDF_CONFIG=/appli/NETCDF/4.3.3.1-mpt-intel/bin/nf-config
HOSTNAME=r1i7n3
XKEYSYMDB=/usr/X11R6/lib/X11/XKeysymDB
INTEL_LICENSE_FILE=28518@vdatarmor-lic
TERM=linux
HOST=r1i7n3
SHELL=/bin/csh
PROFILEREAD=true
HISTSIZE=1000
savehist=1000
PBS_JOBNAME=xarray.job
TMPDIR=/dev/shm/pbs.138488.datarmor0
LIBRARY_PATH=/appli/intel/parallel_studio/parallel_studio_xe_2016_update4/compilers_and_libraries_2016.4.258/linux/mkl/lib/intel64:/appli/mpt/2.15-p11386/lib
FPATH=/appli/intel/parallel_studio/parallel_studio_xe_2016_update4/compilers_and_libraries_2016.4.258/linux/mkl/include:/appli/NETCDF/4.3.3.1-mpt-intel/include:/appli/mpt/2.15-p11386/include
MORE=-sl
PBS_ENVIRONMENT=PBS_BATCH
MPI_LAUNCH=mpiexec_mpt
MIC_LD_LIBRARY_PATH=/appli/intel/parallel_studio/parallel_studio_xe_2016_update4/compilers_and_libraries_2016.4.258/linux/mkl/lib/mic:/appli/intel/parallel_studio/parallel_studio_xe_2016_update4/compilers_and_libraries_2016.4.258/linux/compiler/lib/mic:/opt/intel/mic/myo/lib:/opt/intel/mic/coi/device-linux-release/lib
PBS_O_WORKDIR=/home1/datahome/aponte/xarray/run
NCPUS=28
GROUP=lpo
PBS_TASKNUM=1
USER=aponte
LD_LIBRARY_PATH=/appli/intel/parallel_studio/parallel_studio_xe_2016_update4/compilers_and_libraries_2016.4.258/linux/mkl/lib/intel64_lin:/appli/intel/parallel_studio/parallel_studio_xe_2016_update4/compilers_and_libraries_2016.4.258/linux/mkl/lib/intel64:/appli/NETCDF/4.3.3.1-mpt-intel/lib:/appli/intel/parallel_studio/parallel_studio_xe_2016_update4/compilers_and_libraries_2016.4.258/linux/compiler/lib/intel64:/opt/intel/mic/coi/host-linux-release/lib:/opt/intel/mic/myo/lib:/appli/mpt/2.15-p11386/lib
LS_COLORS=no=00:fi=00:di=01;34:ln=00;36:pi=40;33:so=01;35:do=01;35:bd=40;33;01:cd=40;33;01:or=41;33;01:ex=00;32:*.cmd=00;32:*.exe=01;32:*.com=01;32:*.bat=01;32:*.btm=01;32:*.dll=01;32:*.tar=00;31:*.tbz=00;31:*.tgz=00;31:*.rpm=00;31:*.deb=00;31:*.arj=00;31:*.taz=00;31:*.lzh=00;31:*.lzma=00;31:*.zip=00;31:*.zoo=00;31:*.z=00;31:*.Z=00;31:*.gz=00;31:*.bz2=00;31:*.tb2=00;31:*.tz2=00;31:*.tbz2=00;31:*.xz=00;31:*.avi=01;35:*.bmp=01;35:*.fli=01;35:*.gif=01;35:*.jpg=01;35:*.jpeg=01;35:*.mng=01;35:*.mov=01;35:*.mpg=01;35:*.pcx=01;35:*.pbm=01;35:*.pgm=01;35:*.png=01;35:*.ppm=01;35:*.tga=01;35:*.tif=01;35:*.xbm=01;35:*.xpm=01;35:*.dl=01;35:*.gl=01;35:*.wmv=01;35:*.aiff=00;32:*.au=00;32:*.mid=00;32:*.mp3=00;32:*.ogg=00;32:*.voc=00;32:*.wav=00;32:
CSHRCREAD=true
PBS_O_HOME=/home1/datahome/aponte
WORKDIR=/work/aponte/
XNLSPATH=/usr/X11R6/lib/X11/nls
CPATH=/appli/intel/parallel_studio/parallel_studio_xe_2016_update4/compilers_and_libraries_2016.4.258/linux/mkl/include:/appli/NETCDF/4.3.3.1-mpt-intel/include:/appli/mpt/2.15-p11386/include
OSCAR_HOME=/opt/oscar
HOSTTYPE=x86_64
DATAWORK=/home1/datawork/aponte
FROM_HEADER=
PBS_MOMPORT=15003
PAGER=less
CSHEDIT=emacs
MPI_LIBS=-L/appli/mpt/2.15-p11386/lib -lmpi
XDG_CONFIG_DIRS=/etc/xdg
PBS_O_QUEUE=mpi
NLSPATH=/appli/intel/parallel_studio/parallel_studio_xe_2016_update4/compilers_and_libraries_2016.4.258/linux/mkl/lib/intel64:/appli/intel/parallel_studio/parallel_studio_xe_2016_update4/compilers_and_libraries_2016.4.258/linux/compiler/lib/intel64:/appli/intel/parallel_studio/parallel_studio_xe_2016_update4/debugger_2016/gdb/intel64
MINICOM=-c on
NETCDF_MODULE=4.3.3.1-mpt-intel2016
MODULE_VERSION=3.2.10
MAIL=/var/spool/mail/aponte
PBS_O_LOGNAME=aponte
PATH=/home1/datahome/aponte/.miniconda2/envs/daskdist/bin:/home1/datahome/aponte/.miniconda2/envs/petsc/bin:/usr/pbs/bin/:/home1/datahome/aponte/.miniconda2/bin:/opt/sgi/sbin:/opt/sgi/bin:/usr/local/bin:/usr/bin:/bin:/usr/games:/opt/c3/bin:/opt/pbs/default/bin:/opt/pbs/default/sbin:/usr/lpp/mmfs/bin:/sbin:/bin
CONDA_PATH_BACKUP=/usr/pbs/bin/:/home1/datahome/aponte/.miniconda2/bin:/opt/sgi/sbin:/opt/sgi/bin:/usr/local/bin:/usr/bin:/bin:/usr/games:/opt/c3/bin:/opt/pbs/default/bin:/opt/pbs/default/sbin:/usr/lpp/mmfs/bin:/sbin:/bin
CPU=x86_64
PBS_O_LANG=en_US.UTF-8
PBS_JOBCOOKIE=0000000052AAD434000000002938C2FF
CONDA_PREFIX=/home1/datahome/aponte/.miniconda2/envs/petsc
C_INCLUDE_PATH=/appli/mpt/2.15-p11386/include
INPUTRC=/etc/inputrc
PWD=/home1/datahome/aponte
_LMFILES_=/appli/modulefiles/mpt/2.15:/appli/modulefiles/intel-fc-16/16.0.4.258:/appli/modulefiles/NETCDF/4.3.3.1-mpt-intel2016:/appli/modulefiles/intel-cmkl-16/16.0.4.258
PBS_EXEC=/opt/pbs/default
LANG=en_US.UTF-8
PBS_NODENUM=0
HYB_LAUNCH=mpiexec_mpt omplace
PYTHONSTARTUP=/etc/pythonstart
MODULEPATH=/usr/share/modules/modulefiles:/appli/modulefiles
LOADEDMODULES=mpt/2.15:intel-fc-16/16.0.4.258:NETCDF/4.3.3.1-mpt-intel2016:intel-cmkl-16/16.0.4.258
TZ=GMT
PBS_JOBDIR=/home1/datahome/aponte
MPI_FLAVOR=MPT
PS1=(petsc)
history=1000
C3_RSH=ssh -oConnectTimeout=10 -oForwardX11=no
PBS_O_SHELL=/bin/csh
MPI_VERSION=2.15-p11386
PBS_JOBID=138488.datarmor0
ENVIRONMENT=BATCH
GPG_TTY=not a tty
SHLVL=2
HOME=/home1/datahome/aponte
NETCDF_DIR=/appli/NETCDF/4.3.3.1-mpt-intel/
LANGUAGE=en_US.UTF-8
LESS_ADVANCED_PREPROCESSOR=no
OSTYPE=linux
LS_OPTIONS=-N --color=tty -T 0
PBS_O_HOST=datarmor3.ib0.ice.ifremer.fr
CONDA_PS1_BACKUP=
WINDOWMANAGER=
VENDOR=unknown
MPI_ROOT=/appli/mpt/2.15-p11386
G_FILENAME_ENCODING=@locale,UTF-8,ISO-8859-15,CP1252
LESS=-M -I -R
MACHTYPE=x86_64-suse-linux
LOGNAME=aponte
XDG_DATA_DIRS=/usr/local/share:/usr/share:/opt/oscar/share:/opt/pbs/share
PBS_QUEUE=mpi_2
MODULESHOME=/usr/share/Modules/3.2.10
CONDA_DEFAULT_ENV=petsc
LESSOPEN=lessopen.sh %s
OMP_NUM_THREADS=28
PBS_O_MAIL=/var/mail/aponte
MPI_TYPE_DEPTH=100
LESSCLOSE=lessclose.sh %s %s
PBS_O_SYSTEM=Linux
G_BROKEN_FILENAMES=1
SCRATCH=/home1/scratch/aponte
PBS_NODEFILE=/var/spool/PBS/aux/138488.datarmor0
COLORTERM=1
PBS_O_PATH=/appli/NETCDF/4.3.3.1-mpt-intel/bin:/appli/intel/parallel_studio/parallel_studio_xe_2016_update4/compilers_and_libraries_2016.4.258/linux/bin/intel64:/appli/intel/parallel_studio/parallel_studio_xe_2016_update4/debugger_2016/gdb/intel64:/appli/mpt/2.15-p11386/bin:/opt/sgi/sbin:/opt/sgi/bin:/usr/bin:/bin:/usr/sbin:/sbin:/usr/local/bin:/usr/games:/opt/c3/bin:/appli/pbs/default/bin:/appli/pbs/default/sbin:/usr/lpp/mmfs/bin:/sbin:/bin:/appli/services/bin:.:/home1/datahome/aponte/bin
BASH_FUNC_module()=() {  eval `/usr/share/Modules/$MODULE_VERSION/bin/modulecmd bash $*`
}
_=/usr/bin/env
PBS_NODEFILE start (
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n3.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
r1i7n5.ib0.ice.ifremer.fr
) PBS_NODEFILE ends
PBS_ARRAYID

PBS_NUM_PPN

TMPDIR
/dev/shm/pbs.138488.datarmor0
PBD_JOBID

/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/distributed/utils.py:114: RuntimeWarning: Couldn't detect a suitable IP address for reaching '8.8.8.8', defaulting to '127.0.0.1': [Errno 101] Network is unreachable
  % (host, default, e), RuntimeWarning)
Traceback (most recent call last):
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/bin/dask-worker", line 6, in <module>
    sys.exit(distributed.cli.dask_worker.go())
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/distributed/cli/dask_worker.py", line 231, in go
    main()
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/click/core.py", line 722, in __call__
    return self.main(*args, **kwargs)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/click/core.py", line 697, in main
    rv = self.invoke(ctx)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/click/core.py", line 895, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/click/core.py", line 535, in invoke
    return callback(*args, **kwargs)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/distributed/cli/dask_worker.py", line 167, in main
    raise ValueError("Need to provide scheduler address like\n"
ValueError: Need to provide scheduler address like
dask-worker SCHEDULER_ADDRESS:8786
/var/spool/PBS/mom_priv/jobs/138488.datarmor0.SC: line 45: --scheduler-file: command not found
/var/spool/PBS/mom_priv/jobs/138488.datarmor0.SC: line 46: --nthreads: command not found

If you have any suggestions I'll give it a try, thanks

@mrocklin
Copy link
Member Author

it looks like it's treating each of the individual lines in the dask-worker command as a separate commend. Looking at my text from above it looks like I forgot a trailing slash after dask-worker (which I have now added in an edit. You might want to try adding that that slash after dask-worker.

@apatlpo
Copy link
Contributor

apatlpo commented Jul 17, 2017

The code is not crashing after few minutes with the trailing slash but there is not output.
How can I tell if everything went well?

Note also that in the preceding error code, the variables PBS_ARRAYID, PBS_NUM_PPN were empty.
Shouldn't this be a problem?

@martindurant
Copy link
Member

Can you also try printing $PBS_ARRAY_INDEX ?
You should probably have some .out and .err files with logging information. Similar > output should be added to the scheduler command too. If things are working correctly, there should be no output as such, but you should be able to connect to the scheduler from a python shell with

import os
from dask.distributed import Client
Client(scheduler_file=os.path.join(os.path.expanduser('~'), 'scheduler.json'))

(note that you echo $PBD_JOBID instead of $PBS_JOBID)

@guillaumeeb
Copy link
Member

Hi everybody, and thank you Matthew for this script. I was just planning to make such a tool script for our HPC cluster at CNES. I will start from yours and try to make it work on our supercomputer. I hope I will be able to look at it this week.

But here is a few things I can tell you after a first look:

  • options (e.g ppn, ncpus) may differ accross PBS versions, PBS Pro last version is number 13. For this version the correct option is ncpus.
  • It seems you are trying to use the job array functionnality provided by PBS. There is one big flaw for it in our case: it is kind of a job of jobs, where each child job can be run independently! So PBS may launch each child job one at a time if resources are available one by one (e.g. if only four cores are availables in the cluster with your example, the first job of the array will start alone).

Looking at @apatlpo solution, here is what I can say:

  • You are not using the job array functionality, only a simple PBS job, which is why you don't see the expected env variables, and will never start a scheduler. But this may be the good final solution.
  • mpiproc directive in PBS options seems useless.
  • Instead of $PBS_NUM_PPN, you should use the NCPUS env variable,
  • You are printing PBD_JOB_ID instead of PBS_JOB_ID.
  • You are selecting 2 resources of 28 cores each, so this means you plan to launch only two process that will each use 28 cores, I will start with more processes and less cores for Dask Distributed.
  • In the end, as you are not using a job array, your shell script is only launched once on the PBS Mother node.

Okay so on a second thought, the idea of using job arrays seemed promising, but I believe the flaw I underline makes it not usable. In order to be able to launch several process, so several workers, in only one PBS job, we will need to use something like pbsdsh command, which allows to launch a given command on every resource asked to PBS. I have done this for launching Spark clusters using PBS, and this is probably well suited also for Dask. I will try to improve your proposal with this solution, and share the result here.

@martindurant
Copy link
Member

martindurant commented Jul 17, 2017

@guillaumeeb : I thought for an array, you can specify "I need X tasks, of which Y must run concurrently", although I don't know the syntax for this.

@guillaumeeb
Copy link
Member

@martindurant: You may be right, I will check that tomorrow and tell you, This would be a really nice and clean solution.

@guillaumeeb
Copy link
Member

Another thought, what about the DRMAA proposed contributions on the Setup Network page, like https://github.com/dask/dask-drmaa? It seems a very elegant solution to be able to spawn a Dask cluster on PBS directly from a Python API. As anybody played with this?

@mrocklin
Copy link
Member Author

In my experience users ask far more often for PBS scripts than for help with DRMAA. I'm inclined to meet them with what they already know and use.

That being said, adding DRMAA to the documentation seems like a fine idea to me.

@guillaumeeb
Copy link
Member

guillaumeeb commented Jul 17, 2017

I am also far more used to PBS scripts :). I don't even know if DRMAA tools can work with our PBS Scheduler version. Another thing I need to check. Starting with the script sounds much more simple to me, and still a very good and usable option.

@apatlpo
Copy link
Contributor

apatlpo commented Jul 18, 2017

As anticipated by Guillaume, the script does seem to work, here is the relevant part of the output log:

PBS_NODEFILE start (
r1i5n3.ib0.ice.ifremer.fr
r1i5n4.ib0.ice.ifremer.fr
r1i5n5.ib0.ice.ifremer.fr
r1i5n6.ib0.ice.ifremer.fr
r1i5n7.ib0.ice.ifremer.fr
r1i5n8.ib0.ice.ifremer.fr
r1i5n9.ib0.ice.ifremer.fr
r1i5n10.ib0.ice.ifremer.fr
r1i5n11.ib0.ice.ifremer.fr
r1i5n12.ib0.ice.ifremer.fr
r1i5n13.ib0.ice.ifremer.fr
r1i5n14.ib0.ice.ifremer.fr
r1i5n15.ib0.ice.ifremer.fr
r1i5n16.ib0.ice.ifremer.fr
) PBS_NODEFILE ends
PBS_ARRAYID

PBS_NUM_PPN

TMPDIR
/dev/shm/pbs.138688.datarmor0
PBS_JOBID
138688.datarmor0
PBS_ARRAY_INDEX

NCUPS
2
/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/distributed/utils.py:114: RuntimeWarning: Couldn't detect a suitable IP address for reaching '8.8.8.8', defaulting to '127.0.0.1': [Errno 101] Network is unreachable
  % (host, default, e), RuntimeWarning)
=>> PBS: job killed: walltime 3639 exceeded limit 3600
Terminated

Note that I have modified the number script and am now using upon Guillaume suggestion:
#PBS -l select=14:ncpus=2:mem=120G

Let me know if you want me to try something else.

@guillaumeeb
Copy link
Member

guillaumeeb commented Jul 18, 2017

So I have discussed with a colleague, and it seems there is no such thing as proposed by @martindurant : "I need X tasks, of which Y must run concurrently". The job array philosophy is: "I need to run X tasks, just schedule them as best as you can". But my coworker was very interested by the idea of having a Dask cluster that fills the Gaps available in the PBS resources. So for him the option of a Dask cluster in an array is a really promising one: you ask for 100 resources, but your cluster can begin with only the 10 that are currently available, and grows as new resources become available.

So I believe we must keep the two options, the job array when there is no minimal size required, and we want to start as soon as possible, and the unique job if we know we need only a fixed amount of resources and we prefer to wait for them.

@apatlpo the PBS directive for a job array is:
#PBS -J 1-9:1
Which means I want to launch 9 jobs with a step of 1.
And the env var to use is PBS_ARRAY_INDEX

This should lead to something like this:

#!/bin/bash -login
# Configure these values to change the size of your dask cluster
#PBS -J 1-9:1                 # Nine nodes.  One scheduler and eight workers
#PBS -l select=1:ncpus=2:mem=8G # Each of the jobs select only one resource with 2 cpus and 8GB ram
#PBS -l walltime=01:00:00

# Environment variables
export OMP_NUM_THREADS=1

# Write ~/scheduler.json file in home directory
# connect with
# >>> from dask.distributed import Client
# >>> client = Client(scheduler_file='~/scheduler.json')

# Start scheduler on first process, workers on all others
if [[ $PBS_ARRAY_INDEX == '1' ]]; then
    dask-scheduler --scheduler-file $HOME/scheduler.json;
else
    dask-worker \
    --scheduler-file $HOME/scheduler.json \   
    --nthreads $NCPUS \
    --local-directory $TMPDIR \
    --name worker-$PBS_ARRAY_INDEX \
    > $PBS_O_WORKDIR/$PBS_JOBID-$PBS_ARRAY_INDEX.out \  
    2> $PBS_O_WORKDIR/$PBS_JOBID-$PBS_ARRAY_INDEX.err;  
fi

@guillaumeeb
Copy link
Member

guillaumeeb commented Jul 18, 2017

Hey, seems to work for me:

$ python
Python 2.7.12 (default, Sep 13 2016, 05:19:50) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import os
>>> from dask.distributed import Client
>>> Client(scheduler_file=os.path.join(os.path.expanduser('~'), 'scheduler.json'))
<Client: scheduler=u'tcp://XXX.YYY.ZZZ.59:8786' processes=8 cores=192>

There is a problem with the number of cores though, I should have 8x2=16 cores, but I get 192=8x24, 24 cores being the total available cores of each of my node.

Moreover, how can I had a memory limitation on my workers?

@apatlpo
Copy link
Contributor

apatlpo commented Jul 18, 2017

The code seems to go further with the code you suggested but I sill get errors:

>>> Client(scheduler_file=os.path.join(os.path.expanduser('~'), 'scheduler.json'))
distributed.utils - ERROR - Timed out trying to connect to u'tcp://127.0.0.1:8786' after 5 s: in <distributed.comm.tcp.TCPConnector object at 0x7fffe5714350>: error: [Errno 111] Connection refused
Traceback (most recent call last):
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/distributed/utils.py", line 223, in f
    result[0] = yield make_coro()
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/distributed/client.py", line 680, in _start
    yield self._ensure_connected(timeout=timeout)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/distributed/client.py", line 714, in _ensure_connected
    connection_args=self.connection_args)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/distributed/comm/core.py", line 194, in connect
    _raise(error)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/distributed/comm/core.py", line 177, in _raise
    raise IOError(msg)
IOError: Timed out trying to connect to u'tcp://127.0.0.1:8786' after 5 s: in <distributed.comm.tcp.TCPConnector object at 0x7fffe5714350>: error: [Errno 111] Connection refused

I get in the output log files for the first process:

/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/distributed/utils.py:114: RuntimeWarning: Couldn't detect a suitable IP address for reaching '8.8.8.8', defaulting to '127.0.0.1': [Errno 101] Network is unreachable
  % (host, default, e), RuntimeWarning)
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO -   Scheduler at:      tcp://127.0.0.1:8786
distributed.scheduler - INFO -       bokeh at:              0.0.0.0:8787
distributed.scheduler - INFO -        http at:              0.0.0.0:9786
distributed.scheduler - INFO - Local Directory: /dev/shm/pbs.138838[1].datarmor0/scheduler-Wt5ffF
distributed.scheduler - INFO - -----------------------------------------------
=>> PBS: job killed: walltime 605 exceeded limit 600
Terminated
distributed.scheduler - INFO - End scheduler at 'tcp://:8786'

and for other processes:

/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/distributed/utils.py:114: RuntimeWarning: Couldn't detect a suitable IP address for reaching '8.8.8.8', defaulting to '127.0.0.1': [Errno 101] Network is unreachable
  % (host, default, e), RuntimeWarning)
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:48538'
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:54958
distributed.worker - INFO -              bokeh at:            127.0.0.1:8789
distributed.worker - INFO -               http at:            127.0.0.1:45485
distributed.worker - INFO -              nanny at:            127.0.0.1:48538
distributed.worker - INFO - Waiting to connect to:       tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                         56
distributed.worker - INFO -                Memory:                   81.05 GB
distributed.worker - INFO -       Local Directory:              worker-OM1DJZ
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Trying to connect to scheduler: tcp://127.0.0.1:8786
... (lots of similar lines)
distributed.worker - INFO - Trying to connect to scheduler: tcp://127.0.0.1:8786
distributed.worker - INFO - Trying to connect to scheduler: tcp://127.0.0.1:8786
=>> PBS: job killed: walltime 603 exceeded limit 600
Terminated
distributed.dask_worker - INFO - Exiting on signal 15
distributed.dask_worker - INFO - Exiting on signal 15
distributed.dask_worker - INFO - End worker
distributed.nanny - INFO - Closing Nanny at 'tcp://127.0.0.1:48538'
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:54958
tornado.application - ERROR - Exception in callback <functools.partial object at 0x2aaabd483788>
Traceback (most recent call last):
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/ioloop.py", line 605, in _run_callback
    ret = callback()
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/ioloop.py", line 626, in _discard_future_result
    future.result()
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/distributed/nanny.py", line 135, in _start
    response = yield self.instantiate()
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/distributed/nanny.py", line 194, in instantiate
    yield self.process.start()
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/distributed/nanny.py", line 300, in start
    yield self._wait_until_running()
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/distributed/nanny.py", line 386, in _wait_until_running
    raise ValueError("Worker not started")
ValueError: Worker not started

@guillaumeeb do you have any suggestions for what I could try?

@guillaumeeb
Copy link
Member

guillaumeeb commented Jul 18, 2017

@apatlpo You seem to have a networking problem, that is weird. Apparently, dask only find the loopback address (127.0.0.1) to bind on. So every worker, and your client, tries to connect to its localhost, which is probably not the host where the scheduler is started.
I am not an expert on network, I don't know why your dask scheduler doesn't use a non loopback address, I don't have this problem. You should ask an IT or Admin guy at your company. Or maybe you should try to use an en variable $HOSTNAME or something like that when starting the dask scheduler. @mrocklin there probably is an option to specify the hostname or IP to use by the scheduler?

@mrocklin
Copy link
Member Author

there probably is an option to specify the hostname or IP to use by the scheduler?

Yes, see the help text. I recommend using --interface, especially if you know you have some fast network interface. https://stackoverflow.com/questions/43881157/how-do-i-use-an-infiniband-network-with-dask

mrocklin@carbon:~$ dask-scheduler  --help
Usage: dask-scheduler [OPTIONS]

Options:
  --host TEXT                    URI, IP or hostname of this server
  --port INTEGER                 Serving port
  --interface TEXT               Preferred network interface like 'eth0' or
                                 'ib0'
  --tls-ca-file PATH             CA cert(s) file for TLS (in PEM format)
  --tls-cert PATH                certificate file for TLS (in PEM format)
  --tls-key PATH                 private key file for TLS (in PEM format)
  --http-port INTEGER            HTTP port for JSON API diagnostics
  --bokeh-port INTEGER           Bokeh port for visual diagnostics
  --bokeh-internal-port INTEGER  Deprecated. Use --bokeh-port instead
  --bokeh / --no-bokeh           Launch Bokeh Web UI  [default: True]
  --show / --no-show             Show web UI
  --bokeh-whitelist TEXT         IP addresses to whitelist for bokeh.
  --bokeh-prefix TEXT            Prefix for the bokeh app
  --prefix TEXT                  Deprecated, see --bokeh-prefix
  --use-xheaders BOOLEAN         User xheaders in bokeh app for ssl
                                 termination in header  [default: False]
  --pid-file TEXT                File to write the process PID
  --scheduler-file TEXT          File to write connection information. This
                                 may be a good way to share connection
                                 information if your cluster is on a shared
                                 network file system.
  --local-directory TEXT         Directory to place scheduler files
  --preload TEXT                 Module that should be loaded by each worker
                                 process like "foo.bar" or "/path/to/foo.py"
  --help                         Show this message and exit.

@mrocklin
Copy link
Member Author

OK, learning about the difference between job arrays that can fill in gaps and larger jobs is useful. While I agree that there are many useful situations for job arrays I'm inclined to set our default published PBS script to the simpler "wait until you have N machines please". I think this means that we should change

#PBS -t 1-9                 # Nine nodes.  One scheduler and eight workers

to

#PBS -l nodes=9             # Nine nodes

@mrocklin
Copy link
Member Author

Cores limitation should be handled by the --nthreads $PBS_NUM_PPN flag, so apparently this isn't working. You can explicitly state a memory cap with --memory-limit XXX where XXX is the number of bytes (like 50e9). By default Dask will look at the total memory, and divide by the fraction of cores that it has been given.

So what is the right solution here for --nthreads? Is there a better environment variable?

@mrocklin
Copy link
Member Author

Printing out the dask-worker options as well for reference:

mrocklin@carbon:~$ dask-worker --help
Usage: dask-worker [OPTIONS] [SCHEDULER]

Options:
  --tls-ca-file PATH            CA cert(s) file for TLS (in PEM format)
  --tls-cert PATH               certificate file for TLS (in PEM format)
  --tls-key PATH                private key file for TLS (in PEM format)
  --worker-port INTEGER         Serving computation port, defaults to random
  --http-port INTEGER           Serving http port, defaults to random
  --nanny-port INTEGER          Serving nanny port, defaults to random
  --bokeh-port INTEGER          Bokeh port, defaults to 8789
  --bokeh / --no-bokeh          Launch Bokeh Web UI  [default: True]
  --host TEXT                   Serving host. Should be an ip address that is
                                visible to the scheduler and other workers.
                                See --interface.
  --interface TEXT              Network interface like 'eth0' or 'ib0'
  --nthreads INTEGER            Number of threads per process.
  --nprocs INTEGER              Number of worker processes.  Defaults to one.
  --name TEXT                   A unique name for this worker like 'worker-1'
  --memory-limit TEXT           Number of bytes before spilling data to disk.
                                This can be an integer (nbytes) float
                                (fraction of total memory) or 'auto'
  --reconnect / --no-reconnect  Reconnect to scheduler if disconnected
  --nanny / --no-nanny          Start workers in nanny process for management
  --pid-file TEXT               File to write the process PID
  --local-directory TEXT        Directory to place worker files
  --resources TEXT              Resources for task constraints like "GPU=2
                                MEM=10e9"
  --scheduler-file TEXT         Filename to JSON encoded scheduler
                                information. Use with dask-scheduler
                                --scheduler-file
  --death-timeout FLOAT         Seconds to wait for a scheduler before closing
  --bokeh-prefix TEXT           Prefix for the bokeh app
  --preload TEXT                Module that should be loaded by each worker
                                process like "foo.bar" or "/path/to/foo.py"
  --help                        Show this message and exit.

@apatlpo
Copy link
Contributor

apatlpo commented Jul 18, 2017

(note that this post was updated, the reason being that I was not waiting long enough before running the Client command in the python shell)

Ok, I was unable to run with 'PBS -l nodes=...' so I stuck with the array option for now

the PBS script looks now like:

#!/bin/bash -login
# Configure these values to change the size of your dask cluster
#PBS -q mpi_1
## last line is equivalent to -l select=1:ncpus=28 but conforms with my cluster queue names
#PBS -J 1-9:1
#PBS -l walltime=00:05:00

# Environment variables
export PATH="$HOME/.miniconda2/envs/daskdist/bin:$PATH"
export OMP_NUM_THREADS=1

echo "NCPUS = $NCPUS"

# Start scheduler on first process, workers on all others
if [[ $PBS_ARRAY_INDEX == '1' ]]; then
    dask-scheduler --interface ib0 --scheduler-file $HOME/scheduler.json ;
else
    dask-worker \
    --scheduler-file $HOME/scheduler.json \
    --nthreads $NCPUS \
    --local-directory $TMPDIR \
    --interface ib0 \
    --name worker-$PBS_ARRAY_INDEX \
    > $PBS_O_WORKDIR/$PBS_JOBID-$PBS_ARRAY_INDEX.out \
    2> $PBS_O_WORKDIR/$PBS_JOBID-$PBS_ARRAY_INDEX.err;
fi

and I got:

>>> Client(scheduler_file=os.path.join(os.path.expanduser('~'), 'scheduler.json'))
<Client: scheduler=u'tcp://10.148.0.111:8786' processes=8 cores=448>
>>> client =Client(scheduler_file=os.path.join(os.path.expanduser('~'), 'scheduler.json'))
>>> client.scheduler_info()
{'services': {'bokeh': 8787, 'http': 9786}, 'workers': {'tcp://10.148.0.36:55799': {'memory_limit': 81051200716.0, 'memory-vms': 477753344, 'name': 'tcp://10.148.0.36:55799', 'executing': 0, 'in_flight': 0, 'pid': 34240, 'last-seen': 1500389830.059022, 'ncores': 56, 'memory-rss': 65957888, 'in_memory': 0, 'host': '10.148.0.36', 'time-delay': 0.003144979476928711, 'services': {'bokeh': 8789, 'http': 56391, 'nanny': 42200}, 'local_directory': u'worker-_q0YWR', 'memory': 477753344, 'ready': 0}, 'tcp://10.148.0.39:35586': {'memory_limit': 81051200716.0, 'memory-vms': 477753344, 'name': 'tcp://10.148.0.39:35586', 'executing': 0, 'in_flight': 0, 'pid': 38951, 'last-seen': 1500389829.986084, 'ncores': 56, 'memory-rss': 65941504, 'in_memory': 0, 'host': '10.148.0.39', 'time-delay': 0.0032291412353515625, 'services': {'bokeh': 8789, 'http': 53815, 'nanny': 60560}, 'local_directory': u'worker-RVzEx9', 'memory': 477753344, 'ready': 0}, 'tcp://10.148.0.29:50932': {'memory_limit': 81051200716.0, 'memory-vms': 477753344, 'name': 'tcp://10.148.0.29:50932', 'executing': 0, 'in_flight': 0, 'pid': 15794, 'last-seen': 1500389829.954113, 'ncores': 56, 'memory-rss': 65949696, 'in_memory': 0, 'host': '10.148.0.29', 'time-delay': 0.0029790401458740234, 'services': {'bokeh': 8789, 'http': 46120, 'nanny': 36149}, 'local_directory': u'worker-5k4rrB', 'memory': 477753344, 'ready': 0}, 'tcp://10.148.0.38:49394': {'memory_limit': 81051200716.0, 'memory-vms': 477753344, 'name': 'tcp://10.148.0.38:49394', 'executing': 0, 'in_flight': 0, 'pid': 7389, 'last-seen': 1500389830.061031, 'ncores': 56, 'memory-rss': 65957888, 'in_memory': 0, 'host': '10.148.0.38', 'time-delay': 0.003715991973876953, 'services': {'bokeh': 8789, 'http': 43717, 'nanny': 57717}, 'local_directory': u'worker-HSuBor', 'memory': 477753344, 'ready': 0}, 'tcp://10.148.1.69:54920': {'memory_limit': 81051200716.0, 'memory-vms': 477761536, 'name': 'tcp://10.148.1.69:54920', 'executing': 0, 'in_flight': 0, 'pid': 15159, 'last-seen': 1500389829.691123, 'ncores': 56, 'memory-rss': 65949696, 'in_memory': 0, 'host': '10.148.1.69', 'time-delay': 9.703636169433594e-05, 'services': {'bokeh': 8789, 'http': 40605, 'nanny': 55985}, 'local_directory': u'worker-jFCvXb', 'memory': 477761536, 'ready': 0}, 'tcp://10.148.0.34:44472': {'memory_limit': 81051200716.0, 'memory-vms': 477753344, 'name': 'tcp://10.148.0.34:44472', 'executing': 0, 'in_flight': 0, 'pid': 29263, 'last-seen': 1500389829.992393, 'ncores': 56, 'memory-rss': 65933312, 'in_memory': 0, 'host': '10.148.0.34', 'time-delay': 0.0028901100158691406, 'services': {'bokeh': 8789, 'http': 43082, 'nanny': 50248}, 'local_directory': u'worker-jaeTGh', 'memory': 477753344, 'ready': 0}, 'tcp://10.148.0.106:57369': {'memory_limit': 81051200716.0, 'memory-vms': 477753344, 'name': 'tcp://10.148.0.106:57369', 'executing': 0, 'in_flight': 0, 'pid': 19254, 'last-seen': 1500389829.966905, 'ncores': 56, 'memory-rss': 65933312, 'in_memory': 0, 'host': '10.148.0.106', 'time-delay': 0.004221916198730469, 'services': {'bokeh': 8789, 'http': 42590, 'nanny': 39881}, 'local_directory': u'worker-58bB_J', 'memory': 477753344, 'ready': 0}, 'tcp://10.148.1.71:55281': {'memory_limit': 81051200716.0, 'memory-vms': 477761536, 'name': 'tcp://10.148.1.71:55281', 'executing': 0, 'in_flight': 0, 'pid': 9464, 'last-seen': 1500389829.746755, 'ncores': 56, 'memory-rss': 65953792, 'in_memory': 0, 'host': '10.148.1.71', 'time-delay': -0.0007750988006591797, 'services': {'bokeh': 8789, 'http': 51486, 'nanny': 43028}, 'local_directory': u'worker-61yD7K', 'memory': 477761536, 'ready': 0}}, 'type': 'Scheduler', 'id': 'Scheduler-c4e9ecba-6bc8-11e7-a7dd-0cc47a3f7923', 'address': 'tcp://10.148.0.111:8786'}

and in the log for the first process:

NCPUS = 28
/home1/datahome/aponte/.miniconda2/envs/daskdist/lib/python2.7/site-packages/distributed/utils.py:114: RuntimeWarning: Couldn't detect a suitable IP address for reaching '8.8.8.8', defaulting to '127.0.0.1': [Errno 101] Network is unreachable
  % (host, default, e), RuntimeWarning)
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO -   Scheduler at:   tcp://10.148.0.111:8786
distributed.scheduler - INFO -       bokeh at:         10.148.0.111:8787
distributed.scheduler - INFO -        http at:         10.148.0.111:9786
distributed.scheduler - INFO - Local Directory: /dev/shm/pbs.139597[1].datarmor0/scheduler-6KeanK
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Register tcp://10.148.0.36:55799
distributed.scheduler - INFO - Register tcp://10.148.0.38:49394
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.148.0.36:55799
...

and for one member of the array:

distributed.nanny - INFO -         Start Nanny at: 'tcp://10.148.0.29:36149'
distributed.worker - INFO -       Start worker at:    tcp://10.148.0.29:50932
distributed.worker - INFO -              bokeh at:          10.148.0.29:8789
distributed.worker - INFO -               http at:          10.148.0.29:46120
distributed.worker - INFO -              nanny at:          10.148.0.29:36149
distributed.worker - INFO - Waiting to connect to:    tcp://10.148.0.111:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                         56
distributed.worker - INFO -                Memory:                   81.05 GB

@mrocklin
Copy link
Member Author

It looks like the --nthreads and --interface keywords aren't getting through somehow.

What does client.scheduler_info() look like?

@apatlpo
Copy link
Contributor

apatlpo commented Jul 18, 2017

I had to update my preceding post, sorry about that.

@mrocklin
Copy link
Member Author

Yeah, it's not seeing anything beneath --scheduler-file. You might try putting all of those lines onto the same line, removing the backslashes.

@guillaumeeb
Copy link
Member

guillaumeeb commented Jul 18, 2017

@apatlpo Be carefull, You are submitting 28 jobs, each using 14x2=28 cores, so at the end you will use 784 cores! Apparently, only 4 jobs were launched at the time you connected with your client: the scheduler and 3 workers.

The good options would be:

#PBS -J 1-28:1
#PBS -l select=1:ncpus=2:mem=8G
#PBS -l walltime=00:05:00

So you have a Dask cluster, but as @mrocklin said, the --nthreads keyword is not taken into account, despite the fact that NCPUS env variable is good.
But the --interface seems to be OK, doesn't it? scheduler and workers are finding each others!

@mrocklin
Copy link
Member Author

But the --interface seems to be OK, doesn't it? scheduler and workers are finding each others!

The 8.8.8.8 error wouldn't come up if it were given an interface. It's just using whatever is default. This tends to be ethernet.

@guillaumeeb
Copy link
Member

Oh, I didn't notice this error. But it looks like the scheduler and workers are now binding on a correct IP and are able to talk to each others.

@mrocklin
Copy link
Member Author

See #1367 for an mpi4py based solution

@stuartcampbell
Copy link

@mrocklin So do you want the pbs/slurm examples or should we put the effort into making the mpi4py solution as good as it can be ?

@mrocklin
Copy link
Member Author

I still think that PBS/SLURM examples would be useful. Having options can be very helpful. Deployment often has unexpected constraints.

@stuartcampbell
Copy link

I completely agree.

@guillaumeeb
Copy link
Member

Hi everyone, I think I will get back at this soon because some people are beginning to show interest on dask distributed at CNES.

But @subhasisb perhaps you could please clarify some points first:

  1. As I said in a previous comment, here is one problem I face:

Major one being that pbsdsh launches shell commands into a bash environment lacking some basic variables, so I lose my python path and installed libraries, thus no more dask-scheduler or dask-worker command available

Actually the problem is that the pbsdsh executes a process with minimal environment, e.g. without bashrc or /etc/profile loading. If I reload all this user environment in the launcher script, it will probably work.
After discussing with my co-workers, I need to try the -V option of qsub command.

What is your thought about that?

  1. In your example above, I don't see any call to pbsdsh, is that intended?

@mrocklin
Copy link
Member Author

mrocklin commented Sep 7, 2017

There was some significant progress on deploying Dask on HPC systems over at pangeo-data/pangeo#2 .

@lzamparo
Copy link

+1 for an LSF example template script. I'd be happy to help with this.

@mrocklin
Copy link
Member Author

Any efforts would be welcome. If you can put something together @lzamparo I'd be happy to offer thoughts. I don't have an LSF system handy.

@lzamparo
Copy link

lzamparo commented Sep 26, 2017

@mrocklin Made an attempt setting up a scheduler & some workers using LSF's job dependency syntax (see my gist).

However, I'm not sure that I'm connecting the workers to the scheduler correctly. While bjobs | less shows me that both scheduler and worker jobs are running, the scheduler.json file shows no associated workers. Also, when I try creating a dask.distributed.Client in an interactive session hangs (see the gist).

Any tips on debugging / error logging in distributed?

Update: closing the interactive session revealed some hopefully useful traceback info, which I appended to the gist's session output.

@mrocklin
Copy link
Member Author

scheduler.json is written by the scheduler when it first starts up and has no workers. I would not expect the workers field to be populated even if things ran successfully. A simple way to determine if workers have connected would be to check the logs for either side.

You might want to check that the network address published by the scheduler in the scheduler.json file is visible to the workers and client. If not then you might want to select a different network interface with --interface or --host. You might want to check that your home directory on all machines is the same. If not then you might want to make sure that you write your file to somewhere on the NFS.

These are the only things that come to mind at the moment.

@lzamparo
Copy link

distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Scheduler at: tcp://10.230.2.65:8786
distributed.scheduler - INFO - http at: :9786
distributed.scheduler - INFO - Local Directory: /tmp/816571.tmpdir/scheduler-mel6fw8u
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Register tcp://10.230.2.65:40549
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.230.2.65:40549
distributed.scheduler - INFO - Register tcp://10.230.2.208:46607
distributed.scheduler - INFO - Register tcp://10.230.2.208:36467
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.230.2.208:46607
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.230.2.208:36467
distributed.scheduler - INFO - Register tcp://10.230.2.208:37416
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.230.2.208:37416
distributed.scheduler - INFO - End scheduler at 'tcp://:8786'

The scheduler's log suggests that the workers can see it and try to register. The filesystem is shared among all compute nodes, no worries there.

@lzamparo
Copy link

distributed.nanny - INFO - Start Nanny at: 'tcp://10.230.2.208:45940'
distributed.worker - INFO - Start worker at: tcp://10.230.2.208:37416
distributed.worker - INFO - Listening to: tcp://10.230.2.208:37416
distributed.worker - INFO - http at: 10.230.2.208:43611
distributed.worker - INFO - nanny at: 10.230.2.208:45940
distributed.worker - INFO - Waiting to connect to: tcp://10.230.2.65:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 72
distributed.worker - INFO - Memory: 324.44 GB
distributed.worker - INFO - Local Directory: worker-yt_936mo
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://10.230.2.65:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Connection to scheduler broken. Reregistering
distributed.worker - INFO - Trying to connect to scheduler: tcp://10.230.2.65:8786
distributed.worker - INFO - Close compute stream
distributed.dask_worker - INFO - Exiting on signal 2
Traceback (most recent call last):
File "", line 1, in
File "/home/zamparol/anaconda3/lib/python3.5/multiprocessing/forkserver.py", line 164, in main
rfds = [key.fileobj for (key, events) in selector.select()]
File "/home/zamparol/anaconda3/lib/python3.5/selectors.py", line 441, in select
fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
distributed.dask_worker - INFO - End worker
distributed.nanny - INFO - Closing Nanny at 'tcp://10.230.2.208:45940'

This example worker log suggests two problems:

  1. dask is seeing all resources on the node, rather than merely what was assigned to the job (looks like I should have specified resource caps on the dask-worker command line)
  2. The worker lost connection to the scheduler, and seemingly didn't manage to reconnect. The scheduler didn't report that the connections were dropped either, so I don't know what to check further.

@mrocklin
Copy link
Member Author

Yes, see --nthreads and --memory-limit in particular.

It did connect though, which shows that they found each other. Is your client on a machine that is able to see the scheduler? Perhaps everything connected except for that?

@lzamparo
Copy link

@mrocklin yes, all of the client machine, scheduler, and worker nodes are on the same network, so visibility should not be an issue.

@lzamparo
Copy link

Also, I looked at the docs for dask-worker but didn't see any way to specify the number of CPU cores, is it in fact via --nthreads?

@guillaumeeb
Copy link
Member

guillaumeeb commented Oct 30, 2017

Hi everybody,

Found time to work again on this script this morning. Thanks to some examples found in https://github.com/opendatacube github project and the work of a coworker ont it. I used a conda environment like what is done by @mrocklin in pangeo-data/pangeo#2. Here is what I've got:


#!/bin/bash
#PBS -N sample_dask_pbs
#PBS -l select=9:ncpus=4:mem=20G
#PBS -l walltime=01:00:00

# Qsub template for CNES HAL
# Scheduler: PBS

# This writes a scheduler.json file into your home directory
# You can then connect with the following Python code
# >>> from dask.distributed import Client
# >>> client = Client(scheduler_file='~/scheduler.json')

#Environment sourcing
ENV_SOURCE="source ~/.bashrc; export PATH=/home/eh/eynardbg/miniconda3/bin:$PATH; source activate pangeo"
rm -f $PBS_O_WORKDIR/scheduler.json

#Options
export OMP_NUM_THREADS=1
NCPUS=4 #Bug in NCPUS variable in our PBS install
MEMORY_LIMIT="18e9"
#INTERFACE="--interface ib0 "
INTERFACE=""

# Run Dask Scheduler
echo "*** Launching Dask Scheduler ***"
pbsdsh -n 0 -- /bin/bash -c "$ENV_SOURCE; dask-scheduler $INTERFACE --scheduler-file $PBS_O_WORKDIR/scheduler.json  > $PBS_O_WORKDIR/$PBS_JOBID-scheduler-$PBS_TASKNUM.out 2> $PBS_O_WORKDIR/$PBS_JOBID-scheduler-$PBS_TASKNUM.err;"&

#Number of chunks
nbNodes=`cat $PBS_NODEFILE | wc -l`

echo "*** Starting Workers on Other $nbNodes Nodes ***"
for ((i=1; i<$nbNodes; i+=1)); do
    pbsdsh -n ${i} -- /bin/bash -c "$ENV_SOURCE; dask-worker $INTERFACE --scheduler-file $PBS_O_WORKDIR/scheduler.json --nthreads $NCPUS --memory-limit 16e9 --local-directory $TMPDIR --name worker-${i} > $PBS_O_WORKDIR/$PBS_JOBID-worker-${i}.out 2> $PBS_O_WORKDIR/$PBS_JOBID-worker-${i}.err;"&
done

echo "*** Dask cluster is starting ***"
sleep 3600

This seems to work from the client perspective:

In [6]: client
Out[6]: <Client: scheduler='tcp://10.120.40.169:8786' processes=8 cores=32>

I tried to use different network interfaces because I have problem when loading Dask web UI: this takes a lot of time and most of the time it never works. Don't know why yet, if somebody has a hint...

@guillaumeeb
Copy link
Member

Hi everyone,

A quick word to share with you work that has been done on Dask (and also Spark) on PBS at @CNES. PBS scripts to launch Dask are available in this repo: https://github.com/guillaumeeb/big-data-frameworks-on-pbs.

Cheers.

@rabernat
Copy link

rabernat commented Jan 8, 2018

@guillaumeeb, fantastic! Thanks for sharing your progress.

Would you and the CNES folks be interested in collaborating on a project benchmarking the performance of dask vs spark on common analysis workflows?

@gmaze
Copy link

gmaze commented Jan 8, 2018

Hi @rabernat @guillaumeeb, at Ifremer we would be interested in looking at such benchmarking !
Note that I also raised a question with regard to machine learning methods on xarray/dask vs pyspark/MLlib here , some of our classic analysis could be part of the benchmark

@mrocklin
Copy link
Member Author

mrocklin commented Jan 8, 2018

+1 on Dask/Spark benchmarking. This is a frequently asked question.

@guillaumeeb
Copy link
Member

@rabernat (and @gmaze @mrocklin), don't know if you got my email? We would be glad at CNES to collaborate on such a project. We should just define together how we could contribute, on which use cases, and on which place to discuss about it (e.g. Pangeo? Another Dask or Distributed issue?)

Any idea yet?

@rabernat
Copy link

rabernat commented Jan 21, 2018 via email

@alvarouc
Copy link
Contributor

alvarouc commented Mar 2, 2018

@lzamparo Did you figure out your issue? I was experiencing the sample problem. I noticed that at the client I should put the scheduler address directly instead of the scheduler file and it worked.
That is
c = Client('111.99.99.9:8786')
instead of
c = Client(scheduler_file='/path/to/scheduler.json')

@lzamparo
Copy link

lzamparo commented Mar 5, 2018

@alvarouc Thanks for following up. I never did fix this for a bunch of reasons. Good to hear you found a workaround. I presume you still interrogate the /path/to/scheduler.json file to get the IP, and manually craft the string for the Client constructor?

@mrocklin
Copy link
Member Author

This has been resolved by the dask-jobqueue project.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests