Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lsf #78

Merged
merged 23 commits into from
Aug 1, 2018
Merged

Add lsf #78

merged 23 commits into from
Aug 1, 2018

Conversation

raybellwaves
Copy link
Member

working on #4

Thanks to @jhamman for his (in person) help with this. Hopefully I can get this squared away by the end of the week.

FYI I'm testing on python 3.6 at pegasus (University of Miami's HPC) and I was getting an un-obvious psutil error (see below). After installing the dependencies I installed a conda version of psutil (conda install -c conda-forge psutil) and it went away.

In [1]: from dask_jobqueue import LSFCluster
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
<ipython-input-1-8ba34d16bb88> in <module>()
----> 1 from dask_jobqueue import LSFCluster

~/PYTHON3/dask-jobqueue/dask_jobqueue/__init__.py in <module>()
      1 # flake8: noqa
      2 from . import config
----> 3 from .core import JobQueueCluster
      4 from .moab import MoabCluster
      5 from .pbs import PBSCluster

~/PYTHON3/dask-jobqueue/dask_jobqueue/core.py in <module>()
      8 import dask
      9 import docrep
---> 10 from distributed import LocalCluster
     11 from distributed.deploy import Cluster
     12 from distributed.utils import get_ip_interface, ignoring, parse_bytes, tmpfile

~/local/bin/miniconda3/envs/d-jq-test/lib/python3.6/site-packages/distributed/__init__.py in <module>()
      3 from . import config
      4 from dask.config import config
----> 5 from .core import connect, rpc
      6 from .deploy import LocalCluster, Adaptive
      7 from .diagnostics import progress

~/local/bin/miniconda3/envs/d-jq-test/lib/python3.6/site-packages/distributed/core.py in <module>()
     23                    unparse_host_port, get_address_host_port)
     24 from .metrics import time
---> 25 from .system_monitor import SystemMonitor
     26 from .utils import (get_traceback, truncate_exception, ignoring, shutting_down,
     27                     PeriodicCallback, parse_timedelta)

~/local/bin/miniconda3/envs/d-jq-test/lib/python3.6/site-packages/distributed/system_monitor.py in <module>()
      2 
      3 from collections import deque
----> 4 import psutil
      5 
      6 from .compatibility import WINDOWS

~/local/bin/miniconda3/envs/d-jq-test/lib/python3.6/site-packages/psutil/__init__.py in <module>()
     97     PROCFS_PATH = "/proc"
     98 
---> 99     from . import _pslinux as _psplatform
    100 
    101     from ._pslinux import IOPRIO_CLASS_BE  # NOQA

~/local/bin/miniconda3/envs/d-jq-test/lib/python3.6/site-packages/psutil/_pslinux.py in <module>()
     24 from . import _common
     25 from . import _psposix
---> 26 from . import _psutil_linux as cext
     27 from . import _psutil_posix as cext_posix
     28 from ._common import ENCODING

ImportError: /nethome/rxb826/local/bin/miniconda3/envs/d-jq-test/lib/python3.6/site-packages/psutil/_psutil_linux.cpython-36m-x86_64-linux-gnu.so: undefined symbol: __intel_sse4_strncpy

extra: ""
env-extra: []
job-cpu: null
job-mem: null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, can we use the memory and threads * processes entries to remove the need for these entries? This is probably also a question for other dask-jobqueue maintainers, as I know that these appear in other configurations.

@mrocklin
Copy link
Member

It would be useful to have a basic test, even if it only performs sanity-checks on the header. Here is an example for SLURM

def test_header():
with SLURMCluster(walltime='00:02:00', processes=4, threads=2, memory='7GB') as cluster:
assert '#SBATCH' in cluster.job_header
assert '#SBATCH -J dask-worker' in cluster.job_header
assert '#SBATCH -n 1' in cluster.job_header
assert '#SBATCH --cpus-per-task=8' in cluster.job_header
assert '#SBATCH --mem=27G' in cluster.job_header
assert '#SBATCH -t 00:02:00' in cluster.job_header
assert '#SBATCH -p' not in cluster.job_header
assert '#SBATCH -A' not in cluster.job_header

@mrocklin
Copy link
Member

Thanks for working on this!

@lesteve lesteve mentioned this pull request Jun 27, 2018
@jakirkham
Copy link
Member

Could you please share how this should be tested? In particular, would be good to have a short script to try starting up Dask on LSF and running some simple computation with it.

@mrocklin
Copy link
Member

I would expect the following to work for any job-queue cluster

from dask_jobqueue import LSFCluster
cluster = LSFCluster()

from dask.distributed import Client
client = Client(cluster)

assert client.submit(lambda x: x + 1, 10).result() == 11

@raybellwaves
Copy link
Member Author

raybellwaves commented Jun 27, 2018

Hitting a wall here.
https://github.com/dask/dask-jobqueue/blob/master/dask_jobqueue/slurm.py is probably closest to the lsf submit.
There is some code in slurm.py which explicitly sets n to 1 then sets ncpus using #SBATCH --cpus-per-task but there is no such command is LSF see here
https://github.com/dask/dask-jobqueue/blob/master/dask_jobqueue/slurm.py#L88-L94
not sure if I need to set -n as 1 at the start?

When I run client.submit(lambda x: x + 1, 10).result() at the moment it just sits. Actually when I exit ipython I see KeyError: <Task '<lambda>-ab8232d42c76821e2cfa669075dd420b' no-worker>

Doing

from dask_jobqueue import LSFCluster
cluster = LSFCluster()
cluster.job_script()

gives
'#!/bin/bash\n\n#BSUB -J dask-worker\n#BSUB -e dask-worker.err\n#BSUB -o dask-worker.out\n#BSUB -W 00:30\n#BSUB -n 8\n#BSUB -M 30518\n\n\n\n/nethome/rxb826/local/bin/miniconda3/envs/d-jq-test/bin/python -m distributed.cli.dask_worker tcp://10.10.0.13:46285 --nthreads 2 --nprocs 4 --memory-limit 8GB --name dask-worker-2 --death-timeout 60\n'

If I manually copy this to a sumbit script (submit.sh) e.g.

#!/bin/bash

#BSUB -J dask-worker
#BSUB -e dask-worker.err
#BSUB -o dask-worker.out
#BSUB -W 00:30
#BSUB -n 8
#BSUB -M 30518

/nethome/rxb826/local/bin/miniconda3/envs/d-jq-test/bin/python -m distribute
d.cli.dask_worker tcp://10.10.0.14:48549 --nthreads 2 --nprocs 4 --memory-li
mit 8GB --name dask-worker-2 --death-timeout 60

and do bsub < submit.sh it runs e.g.

Job is submitted to <cpp> project.
Job <16633929> is submitted to default queue <general>.
$ bjobs
JOBID     USER    STAT  QUEUE      FROM_HOST   EXEC_HOST   JOB_NAME   SUBMIT_TIME
16633929  rxb826  RUN   general    login3      8*n248      *sk-worker Jun 28 01:58

but within python I seem to be struggling to get any workers.

@guillaumeeb
Copy link
Member

There is some code in slurm.py which explicitly sets n to 1 then sets ncpus using #SBATCH --cpus-per-task but there is no such command is LSF see here

You probably want to use the following options to indicate the use of one node with several processes:

#BSUB -R "span[hosts=1]"
#BSUB -n 8

see https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.3/lsf_admin/span_string.html or https://www.hpc.dtu.dk/?page_id=1401

Copy link
Member

@guillaumeeb guillaumeeb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your work here, hoping my comments will help you finish this!

logger.debug("Job script: \n %s" % self.job_script())

def _job_id_from_submit_output(self, out):
return out.split('.')[0].strip()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you need to either do a parsing of the output string of the job, which seems to be something like:

Job is submitted to <cpp> project.
Job <16633929> is submitted to default queue <general>.

But you more likely want to find the correct option for just outputing the job ID after the bsub command. Unfortunatly I did not find such an option after a few minutes Google search.

""", 4)

# Override class variables
submit_command = 'bsub'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand from your last comment, the submit_command should be bsub < instead of just bsub.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raybellwaves any response to this comment?

@raybellwaves
Copy link
Member Author

raybellwaves commented Jun 28, 2018

Thanks for the comments @guillaumeeb!

I think LSF is unique compared to the others in terms of job submitting. bsub is the command but typing $bsub submit.sh does not spool submit.sh (some explanation in the examples at the bottom here http://www.glue.umd.edu/lsf-docs/man/bsub.html) you need to do bsub < submit.sh. To work around this i've used the solution in https://stackoverflow.com/questions/45134260/submitting-an-lsf-script-via-pythons-subprocess-popen-without-shell-true
I've therefore edited core.py and may have broke other schedulers.

It now works when I set off workers:

In [1]: from dask_jobqueue import LSFCluster

In [2]: cluster = LSFCluster(walltime='00:02', processes=2, threads=1, memory='4GB')

In [3]: workers = cluster.start_workers(2)
Job is submitted to <cpp> project.

Job is submitted to <cpp> project.


In [4]: from dask.distributed import Client

In [5]: client = Client(cluster)

In [6]: client.submit(lambda x: x + 1, 10).result()
Out[6]: 11

Although It still just hangs without setting off any workers:

In [1]: from dask_jobqueue import LSFCluster

In [2]: from dask.distributed import Client

In [3]: cluster = LSFCluster(walltime='00:02', processes=2, threads=1, memory='4GB')

In [4]: client = Client(cluster)

In [5]: client.submit(lambda x: x + 1, 10).result() # does nothing

@raybellwaves
Copy link
Member Author

Well my hack with submit_command in lsf.py and the subprocess.Popen in core.py broke cluster.stop_workers(workers) so I need to think of a solution for bsub < without butchering core.py.

@guillaumeeb
Copy link
Member

See how this is done in ipyparallel: https://github.com/ipython/ipyparallel/blob/6.1.1/ipyparallel/apps/launcher.py#L1397. Maybe you could try to use a similar syntax.

There is still the problem of shell=True keyword.

Maybe we should change the way core.py is written and have dedicated start and stop functions, that could be easily overridden if needed. Or just add a launch_command() function that the other one will call, and that just do the Popen part. @jhamman, @mrocklin, @lesteve any thoughts?

@lesteve
Copy link
Member

lesteve commented Jul 3, 2018

Maybe we should change the way core.py is written and have dedicated start and stop functions, that could be easily overridden if needed. Or just add a launch_command() function that the other one will call, and that just do the Popen part. @jhamman, @mrocklin, @lesteve any thoughts?

Not sure what the best way is, but it looks like we need some special treatment for LSF indeed since it takes stdin and not the script name. Maybe something like this (I am guessing this is similar to your launch_command suggestion):

# in JobQueueCluster
def submit_job(self, script_filename):
    return self._call(shlex.split(self.submit_command) + [script_filename])

def start_workers(self, n=1):
    ...
    with self.job_file() as fn:
        out = self.submit_job(fn)
        ...
# in LSFCluster
def submit_job(self, filename):
    # note popen_kwargs needs to be added to _call so we can pass shell=True
    return self._call(shlex.split(self.submit_command) + ['<', script_filename],
        popen_kwargs={'shell': True})

@raybellwaves
Copy link
Member Author

raybellwaves commented Jul 4, 2018

Thanks for the discussion on this. I'll wait until a decision is made. Unfortunately, LSF seems to be the black sheep and it looks as though it will have to be handled specifically. A launch_command may work that way I won't break cluster.stop_workers(workers) which expects the normal subprocess.Popen.

I can work on the Docker files in the meantime. I've not used docker before so if anyone can point me to some resources for setting up LSF in docker that would be appreciated (actually i'd prefer this to be a separate PR).

Lastly, i'll make the changes to reflect the latest PR. Thanks @mrocklin for your work on that.

@mrocklin
Copy link
Member

mrocklin commented Jul 4, 2018

Something like what @lesteve proposes seems reasonable to me.

@raybellwaves what do you think we should do?

@raybellwaves
Copy link
Member Author

Thanks for the suggestion @lesteve. I believe i've got it working for myself and hopefully it should still work for others. I'll have to test tomorrow though as the queue is jammed.

@lesteve
Copy link
Member

lesteve commented Jul 11, 2018

Great to hear, let us know when you think this is ready for review!

@mrocklin
Copy link
Member

mrocklin commented Jul 11, 2018 via email

@raybellwaves raybellwaves changed the title WIP: Add lsf Add lsf Jul 11, 2018
@raybellwaves
Copy link
Member Author

raybellwaves commented Jul 11, 2018

Fixing the tests

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! I'll be happy to see this in. A couple small comments.

self.shell = True
return self._call(piped_cmd)
else:
return self._call(shlex.split(self.submit_command) + [script_filename])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to keep the core solution simple and instead put the LSF-specific implementation on the LSFCluster class?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see what @lesteve was suggesting

if walltime is None:
walltime = dask.config.get('jobqueue.%s.walltime' % self.scheduler_name)
if job_extra is None:
job_extra = dask.config.get('jobqueue.lsf.job-extra')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also use the self.scheduler_name pattern as above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep

@raybellwaves
Copy link
Member Author

@jhamman good spot. Everything is working again.
I'll spend a little time on suppressing the Job is submitted to <PROJECT> project. output before another review.

@raybellwaves
Copy link
Member Author

raybellwaves commented Jul 18, 2018

The Job is submitted to <PROJECT> project. is the stderr of the bsub command. I redirected it to nowhere in my piped_cmd.

@raybellwaves
Copy link
Member Author

Pinging @mrocklin @jhamman. Sorry to bother you. This is ready for review when you have time.

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay @raybellwaves . Generally this looks good. I've highlighted a couple of points that I think we can improve somewhat easily. Let me know what you think.

@@ -280,13 +282,16 @@ def job_file(self):
f.write(self.job_script())
yield fn

def submit_job(self, script_filename):
return self._call(shlex.split(self.submit_command) + [script_filename])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a private method so that users don't get the wrong idea that they should use it to submit jobs.

env-extra: []
ncpus: null
mem: null
job-extra: []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking in, does LSF need/use all of these options? I'm slightly concerned that as we copy configs from different systems we may accrue more than is necessary.

Copy link
Member Author

@raybellwaves raybellwaves Jul 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only ones I haven't used are extra and env-extra. extra is for Additional arguments to pass to dask-worker so that is probably worth keeping. env-extra is other commands to the script before launching the worker. I can't see myself using this but core.py checks for it

env_extra : list

A future PR could be to move that out of core.py and have users specify it in their individual classes. I'll let you decide that.

""", 4)

# Override class variables
submit_command = 'bsub'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raybellwaves any response to this comment?

`Job is submitted to <PROJECT> project.` which is the stderr and
`Job <JOBID> is submitted to (default) queue <QUEUE>.` which is the stdout.
Supress the stderr by redirecting it to nowhere.
The `piped_cmd` looks like ['bsub < tmp.sh 2> /dev/null'] """
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This docstring looks a bit wonky. It looks more like a developer comment than a documentation string for users. If it is supposed to be a docstring then you might want to follow the numpydoc standard, or take a look at the dask developer notes on docstrings: http://dask.pydata.org/en/latest/develop.html#docstrings

`Job <JOBID> is submitted to (default) queue <QUEUE>.` which is the stdout.
Supress the stderr by redirecting it to nowhere.
The `piped_cmd` looks like ['bsub < tmp.sh 2> /dev/null'] """
self.popen_shell = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems odd to set this here again. It was already set above on the class, right?

Supress the stderr by redirecting it to nowhere.
The `piped_cmd` looks like ['bsub < tmp.sh 2> /dev/null'] """
self.popen_shell = True
piped_cmd = [self.submit_command+' < '+script_filename+' 2> /dev/null']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some minor style issues. I recommend running flake8 on the codebase.

mrocklin@carbon:~/workspace/dask-jobqueue$ flake8 dask_jobqueue
dask_jobqueue/lsf.py:5:1: F401 'os' imported but unused
dask_jobqueue/lsf.py:69:13: F841 local variable 'memory' is assigned to but never used
dask_jobqueue/lsf.py:124:41: E226 missing whitespace around arithmetic operator
dask_jobqueue/lsf.py:124:47: E226 missing whitespace around arithmetic operator
dask_jobqueue/lsf.py:124:63: E226 missing whitespace around arithmetic operator

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened #106 to discuss including this in the CI


def lsf_format_bytes_ceil(n):
""" Format bytes as text
LSF expects megabytes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raybellwaves any response to this comment?


def lsf_format_bytes_ceil(n):
""" Format bytes as text
LSF expects megabytes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it would be good to format this docstring like http://dask.pydata.org/en/latest/develop.html#docstrings


def stop_jobs(self, jobs):
""" set `self.popen_shell = False` """
self.popen_shell = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, you're using the class state to sneak a parameter into the ._call method. I think it's probably better to pass extra keyword arguments into the ._call method directly and avoid the extra state. I'll write a comment on this in the core.py file.

@@ -322,6 +327,7 @@ def _calls(self, cmds):
for cmd in cmds:
logger.debug(' '.join(cmd))
procs.append(subprocess.Popen(cmd,
shell=self.popen_shell,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend that we pass **kwargs, or at least shell= from the _call method into this function call and avoid the state entirely.

def _call(self, cmd, **kwargs):
    return self._calls([cmd], **kwargs)

def _calls(self, cmds, **kwargs):
    ...
    procs.append(subprocess.Popen(cmd,
                                  stdout=subprocess.PIPE,
                                  stderr=subprocess.PIPE,
                                  **kwargs))

Then we can call this like self._call(cmd, shell=True) and avoid mucking about with the popen_shell state.

@mrocklin
Copy link
Member

mrocklin commented Jul 27, 2018 via email

Copy link
Member

@guillaumeeb guillaumeeb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a comment on stop_jobs.

We are close, I believe that after stating about this one we will be able to merge, thanks again for the time taken.

logger.debug("Stopping jobs: %s" % jobs)
if jobs:
jobs = list(jobs)
self._call([self.cancel_command] + list(set(jobs)), shell=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no state anymore, and shell=False is default with popen, you probably don't need it here, and thus you probably don't need to redefine stop_jobs in Lsf implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Good spot.

@mrocklin
Copy link
Member

This now seems fine to me. @raybellwaves can you verify that things work well on your LSF cluster?

@raybellwaves
Copy link
Member Author

screen shot 2018-07-31 at 4 45 32 pm

@raybellwaves
Copy link
Member Author

We can also get feedback from folks in #4 after merging

@mrocklin
Copy link
Member

Merging this tomorrow if there are no further comments. If anyone gets to this before I do and are ok with things I encourage you to merge if you think it's ready.

@jhamman
Copy link
Member

jhamman commented Jul 31, 2018

Thanks @raybellwaves for sticking with this. The only thing I see this needing is a few docs.

See the {index,examples,configurations,api}.rst files for some good places to document the LSFCluster. Bare minimum would be api.rst.

@mrocklin
Copy link
Member

mrocklin commented Jul 31, 2018 via email

Copy link
Member

@guillaumeeb guillaumeeb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all that, I think this is ready.

@guillaumeeb guillaumeeb mentioned this pull request Aug 1, 2018
@mrocklin mrocklin merged commit 2319b22 into dask:master Aug 1, 2018
@mrocklin
Copy link
Member

mrocklin commented Aug 1, 2018

This is in. Thank you @raybellwaves for implementing this! I think that it will be very valuable

@lesteve
Copy link
Member

lesteve commented Aug 1, 2018

Very nice!

@raybellwaves
Copy link
Member Author

Big thanks to your all for creating the package and your teachings.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants